diff --git a/activitypub.c b/activitypub.c index aea353b..5cc059d 100644 --- a/activitypub.c +++ b/activitypub.c @@ -1066,7 +1066,7 @@ int send_email(char *msg) void process_user_queue_item(snac *snac, xs_dict *q_item) -/* processes an item from the queue */ +/* processes an item from the user queue */ { char *type; int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); @@ -1172,7 +1172,7 @@ void process_user_queue(snac *snac) xs *q_item = dequeue(fn); if (q_item == NULL) { - snac_log(snac, xs_fmt("process_queue q_item error")); + snac_log(snac, xs_fmt("process_user_queue q_item error")); continue; } @@ -1181,6 +1181,33 @@ void process_user_queue(snac *snac) } +void process_queue_item(xs_dict *q_item) +/* processes an item from the global queue */ +{ +} + + +void process_queue(void) +/* processes the global queue */ +{ + xs *list = queue(); + + xs_list *p = list; + xs_str *fn; + + while (xs_list_iter(&p, &fn)) { + xs *q_item = dequeue(fn); + + if (q_item == NULL) { + srv_log(xs_fmt("process_queue q_item error")); + continue; + } + + process_queue_item(q_item); + } +} + + /** HTTP handlers */ int activitypub_get_handler(d_char *req, char *q_path, diff --git a/data.c b/data.c index 1adf4cd..96583aa 100644 --- a/data.c +++ b/data.c @@ -86,6 +86,10 @@ int srv_open(char *basedir, int auto_upgrade) if (error != NULL) srv_log(error); + /* create the queue/ subdir, just in case */ + xs *qdir = xs_fmt("%s/queue", srv_basedir); + mkdir(qdir, 0755); + #ifdef __OpenBSD__ char *v = xs_dict_get(srv_config, "disable_openbsd_security"); @@ -1429,10 +1433,39 @@ xs_list *user_queue(snac *snac) time_t t2 = atol(bn + 1); if (t2 > t) - snac_debug(snac, 2, xs_fmt("queue not yet time for %s [%ld]", v, t)); + snac_debug(snac, 2, xs_fmt("user_queue not yet time for %s [%ld]", v, t)); else { list = xs_list_append(list, v); - snac_debug(snac, 2, xs_fmt("queue ready for %s", v)); + snac_debug(snac, 2, xs_fmt("user_queue ready for %s", v)); + } + } + + return list; +} + + +xs_list *queue(void) +/* returns a list with filenames that can be dequeued */ +{ + xs *spec = xs_fmt("%s/queue/" "*.json", srv_basedir); + xs_list *list = xs_list_new(); + time_t t = time(NULL); + xs_list *p; + xs_val *v; + + xs *fns = xs_glob(spec, 0, 0); + + p = fns; + while (xs_list_iter(&p, &v)) { + /* get the retry time from the basename */ + char *bn = strrchr(v, '/'); + time_t t2 = atol(bn + 1); + + if (t2 > t) + srv_debug(2, xs_fmt("queue not yet time for %s [%ld]", v, t)); + else { + list = xs_list_append(list, v); + srv_debug(2, xs_fmt("queue ready for %s", v)); } }