New function process_queue_item().

This commit is contained in:
default 2023-01-31 22:30:34 +01:00
parent f0ef1d4115
commit c2524323a9

View file

@ -845,7 +845,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg)
/** queues **/ /** queues **/
int process_message(snac *snac, char *msg, char *req) int process_input_message(snac *snac, char *msg, char *req)
/* processes an ActivityPub message from the input queue */ /* processes an ActivityPub message from the input queue */
{ {
/* actor and type exist, were checked previously */ /* actor and type exist, were checked previously */
@ -1065,49 +1065,37 @@ int send_email(char *msg)
} }
void process_queue(snac *snac) void process_queue_item(snac *snac, xs_dict *q_item)
/* processes the queue */ /* processes an item from the queue */
{ {
xs *list;
char *p, *fn;
int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
list = queue(snac);
p = list;
while (xs_list_iter(&p, &fn)) {
xs *q_item = dequeue(snac, fn);
char *type; char *type;
int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
if (q_item == NULL) {
snac_log(snac, xs_fmt("process_queue q_item error"));
continue;
}
if ((type = xs_dict_get(q_item, "type")) == NULL) if ((type = xs_dict_get(q_item, "type")) == NULL)
type = "output"; type = "output";
if (strcmp(type, "message") == 0) { if (strcmp(type, "message") == 0) {
char *msg = xs_dict_get(q_item, "message"); xs_dict *msg = xs_dict_get(q_item, "message");
xs *inboxes = inbox_list(snac, msg); xs *inboxes = inbox_list(snac, msg);
char *p, *v; xs_list *p;
xs_str *inbox;
p = inboxes; p = inboxes;
while (xs_list_iter(&p, &v)) { while (xs_list_iter(&p, &inbox)) {
enqueue_output(snac, msg, v, 0); enqueue_output(snac, msg, inbox, 0);
} }
} }
else else
if (strcmp(type, "output") == 0) { if (strcmp(type, "output") == 0) {
int status; int status;
char *inbox = xs_dict_get(q_item, "inbox"); xs_str *inbox = xs_dict_get(q_item, "inbox");
char *msg = xs_dict_get(q_item, "message"); xs_dict *msg = xs_dict_get(q_item, "message");
int retries = xs_number_get(xs_dict_get(q_item, "retries")); int retries = xs_number_get(xs_dict_get(q_item, "retries"));
xs *payload = NULL; xs *payload = NULL;
int p_size = 0; int p_size = 0;
if (xs_is_null(inbox) || xs_is_null(msg)) if (xs_is_null(inbox) || xs_is_null(msg))
continue; return;
/* deliver */ /* deliver */
status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8);
@ -1132,14 +1120,14 @@ void process_queue(snac *snac)
else else
if (strcmp(type, "input") == 0) { if (strcmp(type, "input") == 0) {
/* process the message */ /* process the message */
char *msg = xs_dict_get(q_item, "message"); xs_dict *msg = xs_dict_get(q_item, "message");
char *req = xs_dict_get(q_item, "req"); xs_dict *req = xs_dict_get(q_item, "req");
int retries = xs_number_get(xs_dict_get(q_item, "retries")); int retries = xs_number_get(xs_dict_get(q_item, "retries"));
if (xs_is_null(msg)) if (xs_is_null(msg))
continue; return;
if (!process_message(snac, msg, req)) { if (!process_input_message(snac, msg, req)) {
if (retries > queue_retry_max) if (retries > queue_retry_max)
snac_log(snac, xs_fmt("process_queue input giving up")); snac_log(snac, xs_fmt("process_queue input giving up"));
else { else {
@ -1152,7 +1140,7 @@ void process_queue(snac *snac)
else else
if (strcmp(type, "email") == 0) { if (strcmp(type, "email") == 0) {
/* send this email */ /* send this email */
char *msg = xs_dict_get(q_item, "message"); xs_str *msg = xs_dict_get(q_item, "message");
int retries = xs_number_get(xs_dict_get(q_item, "retries")); int retries = xs_number_get(xs_dict_get(q_item, "retries"));
if (!send_email(msg)) if (!send_email(msg))
@ -1170,6 +1158,26 @@ void process_queue(snac *snac)
} }
} }
} }
void process_queue(snac *snac)
/* processes the queue */
{
xs *list = queue(snac);
xs_list *p = list;
xs_str *fn;
while (xs_list_iter(&p, &fn)) {
xs *q_item = dequeue(snac, fn);
if (q_item == NULL) {
snac_log(snac, xs_fmt("process_queue q_item error"));
continue;
}
process_queue_item(snac, q_item);
}
} }