New function queue() (the global queue).

This commit is contained in:
default 2023-02-02 05:07:20 +01:00
parent 108ed072e2
commit c639553836
2 changed files with 64 additions and 4 deletions

View file

@ -1066,7 +1066,7 @@ int send_email(char *msg)
void process_user_queue_item(snac *snac, xs_dict *q_item) void process_user_queue_item(snac *snac, xs_dict *q_item)
/* processes an item from the queue */ /* processes an item from the user queue */
{ {
char *type; char *type;
int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
@ -1172,7 +1172,7 @@ void process_user_queue(snac *snac)
xs *q_item = dequeue(fn); xs *q_item = dequeue(fn);
if (q_item == NULL) { if (q_item == NULL) {
snac_log(snac, xs_fmt("process_queue q_item error")); snac_log(snac, xs_fmt("process_user_queue q_item error"));
continue; continue;
} }
@ -1181,6 +1181,33 @@ void process_user_queue(snac *snac)
} }
void process_queue_item(xs_dict *q_item)
/* processes an item from the global queue */
{
}
void process_queue(void)
/* processes the global queue */
{
xs *list = queue();
xs_list *p = list;
xs_str *fn;
while (xs_list_iter(&p, &fn)) {
xs *q_item = dequeue(fn);
if (q_item == NULL) {
srv_log(xs_fmt("process_queue q_item error"));
continue;
}
process_queue_item(q_item);
}
}
/** HTTP handlers */ /** HTTP handlers */
int activitypub_get_handler(d_char *req, char *q_path, int activitypub_get_handler(d_char *req, char *q_path,

37
data.c
View file

@ -86,6 +86,10 @@ int srv_open(char *basedir, int auto_upgrade)
if (error != NULL) if (error != NULL)
srv_log(error); srv_log(error);
/* create the queue/ subdir, just in case */
xs *qdir = xs_fmt("%s/queue", srv_basedir);
mkdir(qdir, 0755);
#ifdef __OpenBSD__ #ifdef __OpenBSD__
char *v = xs_dict_get(srv_config, "disable_openbsd_security"); char *v = xs_dict_get(srv_config, "disable_openbsd_security");
@ -1429,10 +1433,39 @@ xs_list *user_queue(snac *snac)
time_t t2 = atol(bn + 1); time_t t2 = atol(bn + 1);
if (t2 > t) if (t2 > t)
snac_debug(snac, 2, xs_fmt("queue not yet time for %s [%ld]", v, t)); snac_debug(snac, 2, xs_fmt("user_queue not yet time for %s [%ld]", v, t));
else { else {
list = xs_list_append(list, v); list = xs_list_append(list, v);
snac_debug(snac, 2, xs_fmt("queue ready for %s", v)); snac_debug(snac, 2, xs_fmt("user_queue ready for %s", v));
}
}
return list;
}
xs_list *queue(void)
/* returns a list with filenames that can be dequeued */
{
xs *spec = xs_fmt("%s/queue/" "*.json", srv_basedir);
xs_list *list = xs_list_new();
time_t t = time(NULL);
xs_list *p;
xs_val *v;
xs *fns = xs_glob(spec, 0, 0);
p = fns;
while (xs_list_iter(&p, &v)) {
/* get the retry time from the basename */
char *bn = strrchr(v, '/');
time_t t2 = atol(bn + 1);
if (t2 > t)
srv_debug(2, xs_fmt("queue not yet time for %s [%ld]", v, t));
else {
list = xs_list_append(list, v);
srv_debug(2, xs_fmt("queue ready for %s", v));
} }
} }