mirror of
https://codeberg.org/grunfink/snac2.git
synced 2024-12-25 16:53:37 +00:00
Queue messages to inboxes instead of actors.
This commit is contained in:
parent
a12da31bdf
commit
f0e17d6753
3 changed files with 39 additions and 14 deletions
|
@ -212,6 +212,28 @@ d_char *recipient_list(snac *snac, char *msg, int expand_public)
|
|||
}
|
||||
|
||||
|
||||
d_char *inbox_list(snac *snac, char *msg)
|
||||
/* returns the list of inboxes that are recipients of this message */
|
||||
{
|
||||
d_char *list = xs_list_new();
|
||||
xs *rcpts = recipient_list(snac, msg, 1);
|
||||
char *p, *v;
|
||||
|
||||
p = rcpts;
|
||||
while (xs_list_iter(&p, &v)) {
|
||||
xs *inbox;
|
||||
|
||||
if ((inbox = get_actor_inbox(snac, v)) != NULL) {
|
||||
/* add the inbox if it's not already there */
|
||||
if (xs_list_in(list, inbox) == -1)
|
||||
list = xs_list_append(list, inbox);
|
||||
}
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
int is_msg_public(snac *snac, char *msg)
|
||||
/* checks if a message is public */
|
||||
{
|
||||
|
@ -930,27 +952,30 @@ void process_queue(snac *snac)
|
|||
|
||||
if (strcmp(type, "output") == 0) {
|
||||
int status;
|
||||
char *actor = xs_dict_get(q_item, "actor");
|
||||
char *inbox = xs_dict_get(q_item, "inbox");
|
||||
char *msg = xs_dict_get(q_item, "object");
|
||||
int retries = xs_number_get(xs_dict_get(q_item, "retries"));
|
||||
xs *payload = NULL;
|
||||
int p_size = 0;
|
||||
|
||||
if (xs_is_null(inbox) || xs_is_null(msg))
|
||||
continue;
|
||||
|
||||
/* deliver */
|
||||
status = send_to_actor(snac, actor, msg, &payload, &p_size);
|
||||
status = send_to_inbox(snac, inbox, msg, &payload, &p_size);
|
||||
|
||||
if (!valid_status(status)) {
|
||||
/* error sending; requeue? */
|
||||
if (retries > queue_retry_max)
|
||||
snac_log(snac, xs_fmt("process_queue giving up %s %d", actor, status));
|
||||
snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status));
|
||||
else {
|
||||
/* requeue */
|
||||
enqueue_output(snac, msg, actor, retries + 1);
|
||||
snac_log(snac, xs_fmt("process_queue requeue %s %d", actor, retries + 1));
|
||||
enqueue_output(snac, msg, inbox, retries + 1);
|
||||
snac_log(snac, xs_fmt("process_queue requeue %s %d", inbox, retries + 1));
|
||||
}
|
||||
}
|
||||
else
|
||||
snac_log(snac, xs_fmt("process_queue sent to actor %s %d", actor, status));
|
||||
snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status));
|
||||
}
|
||||
else
|
||||
if (strcmp(type, "input") == 0) {
|
||||
|
@ -1005,10 +1030,10 @@ void process_queue(snac *snac)
|
|||
void post(snac *snac, char *msg)
|
||||
/* enqueues a message to all its recipients */
|
||||
{
|
||||
xs *rcpts = recipient_list(snac, msg, 1);
|
||||
xs *inboxes = inbox_list(snac, msg);
|
||||
char *p, *v;
|
||||
|
||||
p = rcpts;
|
||||
p = inboxes;
|
||||
while (xs_list_iter(&p, &v)) {
|
||||
enqueue_output(snac, msg, v, 0);
|
||||
}
|
||||
|
|
10
data.c
10
data.c
|
@ -1063,11 +1063,11 @@ void enqueue_input(snac *snac, char *msg, char *req, int retries)
|
|||
}
|
||||
|
||||
|
||||
void enqueue_output(snac *snac, char *msg, char *actor, int retries)
|
||||
void enqueue_output(snac *snac, char *msg, char *inbox, int retries)
|
||||
/* enqueues an output message for an actor */
|
||||
{
|
||||
if (strcmp(actor, snac->actor) == 0) {
|
||||
snac_debug(snac, 1, xs_str_new("enqueue refused to myself"));
|
||||
if (xs_startswith(inbox, snac->actor)) {
|
||||
snac_debug(snac, 1, xs_str_new("refusing enqueue to myself"));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1078,13 +1078,13 @@ void enqueue_output(snac *snac, char *msg, char *actor, int retries)
|
|||
xs *rn = xs_number_new(retries);
|
||||
|
||||
qmsg = xs_dict_append(qmsg, "type", "output");
|
||||
qmsg = xs_dict_append(qmsg, "actor", actor);
|
||||
qmsg = xs_dict_append(qmsg, "inbox", inbox);
|
||||
qmsg = xs_dict_append(qmsg, "object", msg);
|
||||
qmsg = xs_dict_append(qmsg, "retries", rn);
|
||||
|
||||
_enqueue_put(fn, qmsg);
|
||||
|
||||
snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", actor, fn, retries));
|
||||
snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));
|
||||
}
|
||||
|
||||
|
||||
|
|
2
snac.h
2
snac.h
|
@ -92,7 +92,7 @@ int history_del(snac *snac, char *id);
|
|||
d_char *history_list(snac *snac);
|
||||
|
||||
void enqueue_input(snac *snac, char *msg, char *req, int retries);
|
||||
void enqueue_output(snac *snac, char *msg, char *actor, int retries);
|
||||
void enqueue_output(snac *snac, char *msg, char *inbox, int retries);
|
||||
void enqueue_email(snac *snac, char *msg, int retries);
|
||||
|
||||
d_char *queue(snac *snac);
|
||||
|
|
Loading…
Reference in a new issue