Output messages are now processed by the pool of threads.

This commit is contained in:
default 2023-02-07 13:31:48 +01:00
parent 8f63c6259a
commit 4cca157641
3 changed files with 60 additions and 10 deletions

View file

@ -1195,6 +1195,44 @@ void process_queue_item(xs_dict *q_item)
char *type = xs_dict_get(q_item, "type"); char *type = xs_dict_get(q_item, "type");
int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
if (strcmp(type, "output") == 0) {
int status;
xs_str *inbox = xs_dict_get(q_item, "inbox");
xs_str *keyid = xs_dict_get(q_item, "keyid");
xs_str *seckey = xs_dict_get(q_item, "seckey");
xs_dict *msg = xs_dict_get(q_item, "message");
int retries = xs_number_get(xs_dict_get(q_item, "retries"));
xs *payload = NULL;
int p_size = 0;
if (xs_is_null(inbox) || xs_is_null(msg) || xs_is_null(keyid) || xs_is_null(seckey)) {
srv_log(xs_fmt("output message error: missing fields"));
return;
}
/* deliver */
status = send_to_inbox_raw(keyid, seckey, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8);
srv_log(xs_fmt("output message: sent to inbox %s %d", inbox, status));
if (!valid_status(status)) {
retries++;
/* error sending; requeue? */
if (status == 404 || status == 410)
/* explicit error: discard */
srv_log(xs_fmt("output message: fatal error %s %d", inbox, status));
else
if (retries > queue_retry_max)
srv_log(xs_fmt("output message: giving up %s %d", inbox, status));
else {
/* requeue */
enqueue_output_raw(keyid, seckey, msg, inbox, retries);
srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries));
}
}
}
else
if (strcmp(type, "email") == 0) { if (strcmp(type, "email") == 0) {
/* send this email */ /* send this email */
xs_str *msg = xs_dict_get(q_item, "message"); xs_str *msg = xs_dict_get(q_item, "message");

30
data.c
View file

@ -1373,6 +1373,24 @@ void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries)
} }
void enqueue_output_raw(const char *keyid, const char *seckey,
xs_dict *msg, xs_str *inbox, int retries)
/* enqueues an output message to an inbox */
{
xs *qmsg = _new_qmsg("output", msg, retries);
char *ntid = xs_dict_get(qmsg, "ntid");
xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid);
qmsg = xs_dict_append(qmsg, "inbox", inbox);
qmsg = xs_dict_append(qmsg, "keyid", keyid);
qmsg = xs_dict_append(qmsg, "seckey", seckey);
qmsg = _enqueue_put(fn, qmsg);
srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));
}
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries)
/* enqueues an output message to an inbox */ /* enqueues an output message to an inbox */
{ {
@ -1381,17 +1399,9 @@ void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries)
return; return;
} }
xs *qmsg = _new_qmsg("output", msg, retries); char *seckey = xs_dict_get(snac->key, "secret");
char *ntid = xs_dict_get(qmsg, "ntid");
xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid);
qmsg = xs_dict_append(qmsg, "inbox", inbox); enqueue_output_raw(snac->actor, seckey, msg, inbox, retries);
qmsg = xs_dict_append(qmsg, "keyid", snac->actor);
qmsg = xs_dict_append(qmsg, "seckey", xs_dict_get(snac->key, "secret"));
qmsg = _enqueue_put(fn, qmsg);
snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));
} }

2
snac.h
View file

@ -129,6 +129,8 @@ int history_del(snac *snac, char *id);
d_char *history_list(snac *snac); d_char *history_list(snac *snac);
void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries);
void enqueue_output_raw(const char *keyid, const char *seckey,
xs_dict *msg, xs_str *inbox, int retries);
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries);
void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries);
void enqueue_email(xs_str *msg, int retries); void enqueue_email(xs_str *msg, int retries);