From f0e17d67537ee020ec391cd4459b77dfdad7ef24 Mon Sep 17 00:00:00 2001 From: default Date: Thu, 17 Nov 2022 18:33:54 +0100 Subject: [PATCH] Queue messages to inboxes instead of actors. --- activitypub.c | 41 +++++++++++++++++++++++++++++++++-------- data.c | 10 +++++----- snac.h | 2 +- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/activitypub.c b/activitypub.c index 0f801f7..7059471 100644 --- a/activitypub.c +++ b/activitypub.c @@ -212,6 +212,28 @@ d_char *recipient_list(snac *snac, char *msg, int expand_public) } +d_char *inbox_list(snac *snac, char *msg) +/* returns the list of inboxes that are recipients of this message */ +{ + d_char *list = xs_list_new(); + xs *rcpts = recipient_list(snac, msg, 1); + char *p, *v; + + p = rcpts; + while (xs_list_iter(&p, &v)) { + xs *inbox; + + if ((inbox = get_actor_inbox(snac, v)) != NULL) { + /* add the inbox if it's not already there */ + if (xs_list_in(list, inbox) == -1) + list = xs_list_append(list, inbox); + } + } + + return list; +} + + int is_msg_public(snac *snac, char *msg) /* checks if a message is public */ { @@ -930,27 +952,30 @@ void process_queue(snac *snac) if (strcmp(type, "output") == 0) { int status; - char *actor = xs_dict_get(q_item, "actor"); + char *inbox = xs_dict_get(q_item, "inbox"); char *msg = xs_dict_get(q_item, "object"); int retries = xs_number_get(xs_dict_get(q_item, "retries")); xs *payload = NULL; int p_size = 0; + if (xs_is_null(inbox) || xs_is_null(msg)) + continue; + /* deliver */ - status = send_to_actor(snac, actor, msg, &payload, &p_size); + status = send_to_inbox(snac, inbox, msg, &payload, &p_size); if (!valid_status(status)) { /* error sending; requeue? */ if (retries > queue_retry_max) - snac_log(snac, xs_fmt("process_queue giving up %s %d", actor, status)); + snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status)); else { /* requeue */ - enqueue_output(snac, msg, actor, retries + 1); - snac_log(snac, xs_fmt("process_queue requeue %s %d", actor, retries + 1)); + enqueue_output(snac, msg, inbox, retries + 1); + snac_log(snac, xs_fmt("process_queue requeue %s %d", inbox, retries + 1)); } } else - snac_log(snac, xs_fmt("process_queue sent to actor %s %d", actor, status)); + snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status)); } else if (strcmp(type, "input") == 0) { @@ -1005,10 +1030,10 @@ void process_queue(snac *snac) void post(snac *snac, char *msg) /* enqueues a message to all its recipients */ { - xs *rcpts = recipient_list(snac, msg, 1); + xs *inboxes = inbox_list(snac, msg); char *p, *v; - p = rcpts; + p = inboxes; while (xs_list_iter(&p, &v)) { enqueue_output(snac, msg, v, 0); } diff --git a/data.c b/data.c index 3c25123..2fbb49e 100644 --- a/data.c +++ b/data.c @@ -1063,11 +1063,11 @@ void enqueue_input(snac *snac, char *msg, char *req, int retries) } -void enqueue_output(snac *snac, char *msg, char *actor, int retries) +void enqueue_output(snac *snac, char *msg, char *inbox, int retries) /* enqueues an output message for an actor */ { - if (strcmp(actor, snac->actor) == 0) { - snac_debug(snac, 1, xs_str_new("enqueue refused to myself")); + if (xs_startswith(inbox, snac->actor)) { + snac_debug(snac, 1, xs_str_new("refusing enqueue to myself")); return; } @@ -1078,13 +1078,13 @@ void enqueue_output(snac *snac, char *msg, char *actor, int retries) xs *rn = xs_number_new(retries); qmsg = xs_dict_append(qmsg, "type", "output"); - qmsg = xs_dict_append(qmsg, "actor", actor); + qmsg = xs_dict_append(qmsg, "inbox", inbox); qmsg = xs_dict_append(qmsg, "object", msg); qmsg = xs_dict_append(qmsg, "retries", rn); _enqueue_put(fn, qmsg); - snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", actor, fn, retries)); + snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); } diff --git a/snac.h b/snac.h index 4e4a981..45c099e 100644 --- a/snac.h +++ b/snac.h @@ -92,7 +92,7 @@ int history_del(snac *snac, char *id); d_char *history_list(snac *snac); void enqueue_input(snac *snac, char *msg, char *req, int retries); -void enqueue_output(snac *snac, char *msg, char *actor, int retries); +void enqueue_output(snac *snac, char *msg, char *inbox, int retries); void enqueue_email(snac *snac, char *msg, int retries); d_char *queue(snac *snac);