Email notifications have been moved to the global queue.

This commit is contained in:
default 2023-02-02 05:21:16 +01:00
parent c639553836
commit 928f22fbba
4 changed files with 37 additions and 25 deletions

View file

@ -839,7 +839,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg)
body = xs_str_cat(body, s1); body = xs_str_cat(body, s1);
} }
enqueue_email(snac, body, 0); enqueue_email(body, 0);
} }
@ -1137,26 +1137,6 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
} }
} }
} }
else
if (strcmp(type, "email") == 0) {
/* send this email */
xs_str *msg = xs_dict_get(q_item, "message");
int retries = xs_number_get(xs_dict_get(q_item, "retries"));
if (!send_email(msg))
snac_debug(snac, 1, xs_fmt("email message sent"));
else {
if (retries > queue_retry_max)
snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno));
else {
/* requeue */
snac_log(snac, xs_fmt(
"process_queue email requeue #%d (errno: %d)", retries + 1, errno));
enqueue_email(snac, msg, retries + 1);
}
}
}
} }
@ -1184,6 +1164,30 @@ void process_user_queue(snac *snac)
void process_queue_item(xs_dict *q_item) void process_queue_item(xs_dict *q_item)
/* processes an item from the global queue */ /* processes an item from the global queue */
{ {
char *type = xs_dict_get(q_item, "type");
int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
if (strcmp(type, "email") == 0) {
/* send this email */
xs_str *msg = xs_dict_get(q_item, "message");
int retries = xs_number_get(xs_dict_get(q_item, "retries"));
if (!send_email(msg))
srv_debug(1, xs_fmt("email message sent"));
else {
retries++;
if (retries > queue_retry_max)
srv_log(xs_fmt("process_queue email giving up (errno: %d)", errno));
else {
/* requeue */
srv_log(xs_fmt(
"process_queue email requeue #%d (errno: %d)", retries, errno));
enqueue_email(msg, retries);
}
}
}
} }

6
data.c
View file

@ -1389,16 +1389,16 @@ void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retrie
} }
void enqueue_email(snac *snac, xs_str *msg, int retries) void enqueue_email(xs_str *msg, int retries)
/* enqueues an email message to be sent */ /* enqueues an email message to be sent */
{ {
xs *qmsg = _new_qmsg("email", msg, retries); xs *qmsg = _new_qmsg("email", msg, retries);
char *ntid = xs_dict_get(qmsg, "ntid"); char *ntid = xs_dict_get(qmsg, "ntid");
xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid);
qmsg = _enqueue_put(fn, qmsg); qmsg = _enqueue_put(fn, qmsg);
snac_debug(snac, 1, xs_fmt("enqueue_email %d", retries)); srv_debug(1, xs_fmt("enqueue_email %d", retries));
} }

View file

@ -280,6 +280,9 @@ static void *queue_thread(void *arg)
} }
} }
/* global queue */
process_queue();
/* time to purge? */ /* time to purge? */
if ((t = time(NULL)) > purge_time) { if ((t = time(NULL)) > purge_time) {
pthread_t pth; pthread_t pth;

7
snac.h
View file

@ -126,10 +126,11 @@ d_char *history_list(snac *snac);
void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries);
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries);
void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries);
void enqueue_email(snac *snac, xs_str *msg, int retries); void enqueue_email(xs_str *msg, int retries);
void enqueue_message(snac *snac, char *msg); void enqueue_message(snac *snac, char *msg);
xs_list *user_queue(snac *snac); xs_list *user_queue(snac *snac);
xs_list *queue(void);
xs_dict *dequeue(const char *fn); xs_dict *dequeue(const char *fn);
void purge(snac *snac); void purge(snac *snac);
@ -165,7 +166,11 @@ int send_to_inbox(snac *snac, char *inbox, char *msg, d_char **payload, int *p_s
d_char *get_actor_inbox(snac *snac, char *actor); d_char *get_actor_inbox(snac *snac, char *actor);
int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size, int timeout); int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size, int timeout);
int is_msg_public(snac *snac, char *msg); int is_msg_public(snac *snac, char *msg);
void process_user_queue(snac *snac); void process_user_queue(snac *snac);
void process_queue(void);
void post(snac *snac, char *msg); void post(snac *snac, char *msg);
int activitypub_get_handler(d_char *req, char *q_path, int activitypub_get_handler(d_char *req, char *q_path,
char **body, int *b_size, char **ctype); char **body, int *b_size, char **ctype);