Unified queue message field names.

This commit is contained in:
default 2023-01-31 20:22:36 +01:00
parent ba4df29abd
commit 94c4790bd7
2 changed files with 13 additions and 13 deletions

View file

@ -1101,7 +1101,7 @@ void process_queue(snac *snac)
if (strcmp(type, "output") == 0) { if (strcmp(type, "output") == 0) {
int status; int status;
char *inbox = xs_dict_get(q_item, "inbox"); char *inbox = xs_dict_get(q_item, "inbox");
char *msg = xs_dict_get(q_item, "object"); char *msg = xs_dict_get(q_item, "message");
int retries = xs_number_get(xs_dict_get(q_item, "retries")); int retries = xs_number_get(xs_dict_get(q_item, "retries"));
xs *payload = NULL; xs *payload = NULL;
int p_size = 0; int p_size = 0;
@ -1132,10 +1132,13 @@ void process_queue(snac *snac)
else else
if (strcmp(type, "input") == 0) { if (strcmp(type, "input") == 0) {
/* process the message */ /* process the message */
char *msg = xs_dict_get(q_item, "object"); char *msg = xs_dict_get(q_item, "message");
char *req = xs_dict_get(q_item, "req"); char *req = xs_dict_get(q_item, "req");
int retries = xs_number_get(xs_dict_get(q_item, "retries")); int retries = xs_number_get(xs_dict_get(q_item, "retries"));
if (xs_is_null(msg))
continue;
if (!process_message(snac, msg, req)) { if (!process_message(snac, msg, req)) {
if (retries > queue_retry_max) if (retries > queue_retry_max)
snac_log(snac, xs_fmt("process_queue input giving up")); snac_log(snac, xs_fmt("process_queue input giving up"));

19
data.c
View file

@ -1299,10 +1299,9 @@ d_char *history_list(snac *snac)
/** the queue **/ /** the queue **/
static int _enqueue_put(char *fn, char *msg) static xs_dict *_enqueue_put(const char *fn, xs_dict *msg)
/* writes safely to the queue */ /* writes safely to the queue */
{ {
int ret = 1;
xs *tfn = xs_fmt("%s.tmp", fn); xs *tfn = xs_fmt("%s.tmp", fn);
FILE *f; FILE *f;
@ -1314,10 +1313,8 @@ static int _enqueue_put(char *fn, char *msg)
rename(tfn, fn); rename(tfn, fn);
} }
else
ret = 0;
return ret; return msg;
} }
@ -1331,11 +1328,11 @@ void enqueue_input(snac *snac, char *msg, char *req, int retries)
xs *rn = xs_number_new(retries); xs *rn = xs_number_new(retries);
qmsg = xs_dict_append(qmsg, "type", "input"); qmsg = xs_dict_append(qmsg, "type", "input");
qmsg = xs_dict_append(qmsg, "object", msg); qmsg = xs_dict_append(qmsg, "message", msg);
qmsg = xs_dict_append(qmsg, "req", req); qmsg = xs_dict_append(qmsg, "req", req);
qmsg = xs_dict_append(qmsg, "retries", rn); qmsg = xs_dict_append(qmsg, "retries", rn);
_enqueue_put(fn, qmsg); qmsg = _enqueue_put(fn, qmsg);
snac_debug(snac, 1, xs_fmt("enqueue_input %s", fn)); snac_debug(snac, 1, xs_fmt("enqueue_input %s", fn));
} }
@ -1357,10 +1354,10 @@ void enqueue_output(snac *snac, char *msg, char *inbox, int retries)
qmsg = xs_dict_append(qmsg, "type", "output"); qmsg = xs_dict_append(qmsg, "type", "output");
qmsg = xs_dict_append(qmsg, "inbox", inbox); qmsg = xs_dict_append(qmsg, "inbox", inbox);
qmsg = xs_dict_append(qmsg, "object", msg); qmsg = xs_dict_append(qmsg, "message", msg);
qmsg = xs_dict_append(qmsg, "retries", rn); qmsg = xs_dict_append(qmsg, "retries", rn);
_enqueue_put(fn, qmsg); qmsg = _enqueue_put(fn, qmsg);
snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));
} }
@ -1391,7 +1388,7 @@ void enqueue_email(snac *snac, char *msg, int retries)
qmsg = xs_dict_append(qmsg, "message", msg); qmsg = xs_dict_append(qmsg, "message", msg);
qmsg = xs_dict_append(qmsg, "retries", rn); qmsg = xs_dict_append(qmsg, "retries", rn);
_enqueue_put(fn, qmsg); qmsg = _enqueue_put(fn, qmsg);
snac_debug(snac, 1, xs_fmt("enqueue_email %d", retries)); snac_debug(snac, 1, xs_fmt("enqueue_email %d", retries));
} }
@ -1408,7 +1405,7 @@ void enqueue_message(snac *snac, char *msg)
qmsg = xs_dict_append(qmsg, "type", "message"); qmsg = xs_dict_append(qmsg, "type", "message");
qmsg = xs_dict_append(qmsg, "message", msg); qmsg = xs_dict_append(qmsg, "message", msg);
_enqueue_put(fn, qmsg); qmsg = _enqueue_put(fn, qmsg);
snac_debug(snac, 0, xs_fmt("enqueue_message %s", id)); snac_debug(snac, 0, xs_fmt("enqueue_message %s", id));
} }