diff --git a/activitypub.c b/activitypub.c index 42558f4..5aae251 100644 --- a/activitypub.c +++ b/activitypub.c @@ -1188,6 +1188,14 @@ void process_queue_item(xs_dict *q_item) } } } + else + if (strcmp(type, "purge") == 0) { + srv_log(xs_dup("purge start")); + + purge_all(); + + srv_log(xs_dup("purge end")); + } } diff --git a/httpd.c b/httpd.c index 3e4eb7d..f747adc 100644 --- a/httpd.c +++ b/httpd.c @@ -237,81 +237,6 @@ void term_handler(int s) } -static void *purge_thread(void *arg) -/* spawned purge */ -{ - srv_log(xs_dup("purge start")); - - purge_all(); - - srv_log(xs_dup("purge end")); - - return NULL; -} - - -static void *background_thread(void *arg) -/* background thread (queue management and other things) */ -{ - time_t purge_time; - - /* first purge time */ - purge_time = time(NULL) + 10 * 60; - - srv_log(xs_fmt("background thread started")); - - while (srv_running) { - time_t t; - - { - xs *list = user_list(); - char *p, *uid; - - /* process queues for all users */ - p = list; - while (xs_list_iter(&p, &uid)) { - snac snac; - - if (user_open(&snac, uid)) { - process_user_queue(&snac); - user_free(&snac); - } - } - } - - /* global queue */ - process_queue(); - - /* time to purge? */ - if ((t = time(NULL)) > purge_time) { - pthread_t pth; - - pthread_create(&pth, NULL, purge_thread, NULL); - pthread_detach(pth); - - /* next purge time is tomorrow */ - purge_time = t + 24 * 60 * 60; - } - - /* sleep 3 seconds */ - pthread_mutex_t dummy_mutex = PTHREAD_MUTEX_INITIALIZER; - pthread_cond_t dummy_cond = PTHREAD_COND_INITIALIZER; - struct timespec ts; - - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += 3; - - pthread_mutex_lock(&dummy_mutex); - while (pthread_cond_timedwait(&dummy_cond, &dummy_mutex, &ts) == 0); - pthread_mutex_unlock(&dummy_mutex); - } - - srv_log(xs_fmt("background thread stopped")); - - return NULL; -} - - /** job control **/ /* mutex to access the lists of jobs */ @@ -391,6 +316,10 @@ static void *job_thread(void *arg) if (f != NULL) httpd_connection(f); } + else { + /* it's a q_item */ + process_queue_item(job); + } } srv_debug(0, xs_fmt("job thread %ld stopped", pid)); @@ -399,6 +328,67 @@ static void *job_thread(void *arg) } +static void *background_thread(void *arg) +/* background thread (queue management and other things) */ +{ + time_t purge_time; + + /* first purge time */ + purge_time = time(NULL) + 10 * 60; + + srv_log(xs_fmt("background thread started")); + + while (srv_running) { + time_t t; + + { + xs *list = user_list(); + char *p, *uid; + + /* process queues for all users */ + p = list; + while (xs_list_iter(&p, &uid)) { + snac snac; + + if (user_open(&snac, uid)) { + process_user_queue(&snac); + user_free(&snac); + } + } + } + + /* global queue */ + process_queue(); + + /* time to purge? */ + if ((t = time(NULL)) > purge_time) { + /* next purge time is tomorrow */ + purge_time = t + 24 * 60 * 60; + + xs *q_item = xs_dict_new(); + q_item = xs_dict_append(q_item, "type", "purge"); + job_post(q_item); + } + + /* sleep 3 seconds */ + pthread_mutex_t dummy_mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t dummy_cond = PTHREAD_COND_INITIALIZER; + struct timespec ts; + + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 3; + + pthread_mutex_lock(&dummy_mutex); + while (pthread_cond_timedwait(&dummy_cond, &dummy_mutex, &ts) == 0); + pthread_mutex_unlock(&dummy_mutex); + } + + srv_log(xs_fmt("background thread stopped")); + + return NULL; +} + + void httpd(void) /* starts the server */ { diff --git a/snac.h b/snac.h index da4f7cd..e9124bb 100644 --- a/snac.h +++ b/snac.h @@ -169,7 +169,7 @@ int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_s int is_msg_public(snac *snac, char *msg); void process_user_queue(snac *snac); - +void process_queue_item(xs_dict *q_item); void process_queue(void); void post(snac *snac, char *msg);