diff --git a/httpd.c b/httpd.c index d068af3..2708437 100644 --- a/httpd.c +++ b/httpd.c @@ -13,6 +13,7 @@ #include #include +#include /* nodeinfo 2.0 template */ @@ -319,6 +320,80 @@ static void *connection_thread(void *arg) } +/** job control **/ + +/* mutex to access the lists of jobs */ +static pthread_mutex_t job_mutex; + +/* semaphre to trigger job processing */ +static sem_t job_sem; + +/* list of input sockets */ +xs_list *job_sockets = NULL; + +/* list of queue items */ +xs_list *job_qitems = NULL; + + +void job_post(FILE *socket, const xs_dict *q_item) +/* posts a job, being an input connection or another queue item */ +{ + /* lock the mutex */ + pthread_mutex_lock(&job_mutex); + + /* add to the appropriate fifo */ + if (socket != NULL) { + xs *d = xs_data_new(&socket, sizeof(FILE *)); + job_sockets = xs_list_append(job_sockets, d); + } + else + if (q_item != NULL) + job_qitems = xs_list_append(job_qitems, q_item); + + /* unlock the mutex */ + pthread_mutex_unlock(&job_mutex); + + /* ask for someone to attend it */ + sem_post(&job_sem); +} + + +int job_wait(FILE **socket, xs_dict **q_item) +/* waits for an available job; returns 0 if nothing left to do */ +{ + int done = 1; + + *socket = NULL; + *q_item = NULL; + + if (sem_wait(&job_sem) == 0) { + /* lock the mutex */ + pthread_mutex_lock(&job_mutex); + + /* try first to get a socket to process */ + xs *job_socket = NULL; + job_sockets = xs_list_shift(job_sockets, &job_socket); + + /* if empty, try a q_item */ + if (job_socket == NULL) + job_qitems = xs_list_shift(job_qitems, q_item); + + /* unlock the mutex */ + pthread_mutex_unlock(&job_mutex); + + if (job_socket != NULL) { + xs_data_get(job_socket, socket); + done = 0; + } + else + if (*q_item != NULL) + done = 0; + } + + return done; +} + + #ifndef MAX_THREADS #define MAX_THREADS 256 #endif @@ -348,8 +423,14 @@ void httpd(void) srv_log(xs_fmt("httpd start %s:%d %s", address, port, USER_AGENT)); - /* thread creation */ + /* initialize the job control engine */ + pthread_mutex_init(&job_mutex, NULL); + sem_init(&job_sem, 0, 0); + job_sockets = xs_list_new(); + job_qitems = xs_list_new(); + #ifdef _SC_NPROCESSORS_ONLN + /* get number of CPUs on the machine */ n_threads = sysconf(_SC_NPROCESSORS_ONLN); #endif @@ -380,5 +461,8 @@ void httpd(void) /* wait for the background thread to end */ pthread_join(threads[0], NULL); + job_sockets = xs_free(job_sockets); + job_qitems = xs_free(job_qitems); + srv_log(xs_fmt("httpd stop %s:%d", address, port)); }