From 93e7138e53628a76bed9583c5eb45ccf17b97e21 Mon Sep 17 00:00:00 2001 From: default Date: Mon, 8 Jan 2024 08:10:57 +0100 Subject: [PATCH] Rewritten part of the job threads to be leaner and faster. --- data.c | 2 +- httpd.c | 84 ++++++++++++++++++++++++++++++--------------------------- snac.h | 1 - 3 files changed, 46 insertions(+), 41 deletions(-) diff --git a/data.c b/data.c index ecddfc3..f2b4e0f 100644 --- a/data.c +++ b/data.c @@ -2168,7 +2168,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey, qmsg = xs_dict_append(qmsg, "seckey", seckey); /* if it's to be sent right now, bypass the disk queue and post the job */ - if (retries == 0 && job_fifo_ready()) + if (retries == 0) job_post(qmsg, 0); else { qmsg = _enqueue_put(fn, qmsg); diff --git a/httpd.c b/httpd.c index 7e71049..c127fee 100644 --- a/httpd.c +++ b/httpd.c @@ -31,16 +31,22 @@ srv_stat s_stat = {0}; srv_stat *p_stat = NULL; + /** job control **/ /* mutex to access the lists of jobs */ static pthread_mutex_t job_mutex; -/* semaphre to trigger job processing */ +/* semaphore to trigger job processing */ static sem_t *job_sem; -/* fifo of jobs */ -xs_list *job_fifo = NULL; +typedef struct job_fifo_item { + struct job_fifo_item *next; + xs_val *job; +} job_fifo_item; + +static job_fifo_item *job_fifo_first = NULL; +static job_fifo_item *job_fifo_last = NULL; /* nodeinfo 2.0 template */ @@ -418,24 +424,6 @@ void httpd_connection(FILE *f) } -static jmp_buf on_break; - - -void term_handler(int s) -{ - (void)s; - - longjmp(on_break, 1); -} - - -int job_fifo_ready(void) -/* returns true if the job fifo is ready */ -{ - return job_fifo != NULL; -} - - void job_post(const xs_val *job, int urgent) /* posts a job for the threads to process it */ { @@ -443,18 +431,24 @@ void job_post(const xs_val *job, int urgent) /* lock the mutex */ pthread_mutex_lock(&job_mutex); - /* add to the fifo */ - if (job_fifo != NULL) { - if (urgent) - job_fifo = xs_list_insert(job_fifo, 0, job); - else - job_fifo = xs_list_append(job_fifo, job); + job_fifo_item *i = xs_realloc(NULL, sizeof(job_fifo_item)); + *i = (job_fifo_item){ NULL, xs_dup(job) }; - p_stat->job_fifo_size++; - - srv_debug(2, xs_fmt( - "job_fifo sizes: %d %08x", p_stat->job_fifo_size, xs_size(job_fifo))); + if (job_fifo_first == NULL) + job_fifo_first = job_fifo_last = i; + else + if (urgent) { + /* prepend */ + i->next = job_fifo_first; + job_fifo_first = i; } + else { + /* append */ + job_fifo_last->next = i; + job_fifo_last = i; + } + + p_stat->job_fifo_size++; /* unlock the mutex */ pthread_mutex_unlock(&job_mutex); @@ -475,8 +469,16 @@ void job_wait(xs_val **job) pthread_mutex_lock(&job_mutex); /* dequeue */ - if (job_fifo != NULL) { - job_fifo = xs_list_shift(job_fifo, job); + job_fifo_item *i = job_fifo_first; + + if (i != NULL) { + job_fifo_first = i->next; + + if (job_fifo_first == NULL) + job_fifo_last = NULL; + + *job = i->job; + xs_free(i); p_stat->job_fifo_size--; } @@ -604,6 +606,16 @@ static void *background_thread(void *arg) } +static jmp_buf on_break; + +void term_handler(int s) +{ + (void)s; + + longjmp(on_break, 1); +} + + void httpd(void) /* starts the server */ { @@ -663,8 +675,6 @@ void httpd(void) return; } - job_fifo = xs_list_new(); - /* initialize sleep control */ pthread_mutex_init(&sleep_mutex, NULL); pthread_cond_init(&sleep_cond, NULL); @@ -717,10 +727,6 @@ void httpd(void) for (n = 0; n < p_stat->n_threads; n++) pthread_join(threads[n], NULL); - pthread_mutex_lock(&job_mutex); - job_fifo = xs_free(job_fifo); - pthread_mutex_unlock(&job_mutex); - sem_close(job_sem); sem_unlink(sem_name); diff --git a/snac.h b/snac.h index 26ddfee..4030717 100644 --- a/snac.h +++ b/snac.h @@ -294,7 +294,6 @@ int deluser(snac *user); extern const char *snac_blurb; -int job_fifo_ready(void); void job_post(const xs_val *job, int urgent); void job_wait(xs_val **job);