New functions job_post() and job_wait() (untested).

This commit is contained in:
default 2023-02-06 11:29:46 +01:00
parent 2ee6bdc745
commit 6bcc6bfa1c

86
httpd.c
View file

@ -13,6 +13,7 @@
#include <setjmp.h> #include <setjmp.h>
#include <pthread.h> #include <pthread.h>
#include <semaphore.h>
/* nodeinfo 2.0 template */ /* nodeinfo 2.0 template */
@ -319,6 +320,80 @@ static void *connection_thread(void *arg)
} }
/** job control **/
/* mutex to access the lists of jobs */
static pthread_mutex_t job_mutex;
/* semaphre to trigger job processing */
static sem_t job_sem;
/* list of input sockets */
xs_list *job_sockets = NULL;
/* list of queue items */
xs_list *job_qitems = NULL;
void job_post(FILE *socket, const xs_dict *q_item)
/* posts a job, being an input connection or another queue item */
{
/* lock the mutex */
pthread_mutex_lock(&job_mutex);
/* add to the appropriate fifo */
if (socket != NULL) {
xs *d = xs_data_new(&socket, sizeof(FILE *));
job_sockets = xs_list_append(job_sockets, d);
}
else
if (q_item != NULL)
job_qitems = xs_list_append(job_qitems, q_item);
/* unlock the mutex */
pthread_mutex_unlock(&job_mutex);
/* ask for someone to attend it */
sem_post(&job_sem);
}
int job_wait(FILE **socket, xs_dict **q_item)
/* waits for an available job; returns 0 if nothing left to do */
{
int done = 1;
*socket = NULL;
*q_item = NULL;
if (sem_wait(&job_sem) == 0) {
/* lock the mutex */
pthread_mutex_lock(&job_mutex);
/* try first to get a socket to process */
xs *job_socket = NULL;
job_sockets = xs_list_shift(job_sockets, &job_socket);
/* if empty, try a q_item */
if (job_socket == NULL)
job_qitems = xs_list_shift(job_qitems, q_item);
/* unlock the mutex */
pthread_mutex_unlock(&job_mutex);
if (job_socket != NULL) {
xs_data_get(job_socket, socket);
done = 0;
}
else
if (*q_item != NULL)
done = 0;
}
return done;
}
#ifndef MAX_THREADS #ifndef MAX_THREADS
#define MAX_THREADS 256 #define MAX_THREADS 256
#endif #endif
@ -348,8 +423,14 @@ void httpd(void)
srv_log(xs_fmt("httpd start %s:%d %s", address, port, USER_AGENT)); srv_log(xs_fmt("httpd start %s:%d %s", address, port, USER_AGENT));
/* thread creation */ /* initialize the job control engine */
pthread_mutex_init(&job_mutex, NULL);
sem_init(&job_sem, 0, 0);
job_sockets = xs_list_new();
job_qitems = xs_list_new();
#ifdef _SC_NPROCESSORS_ONLN #ifdef _SC_NPROCESSORS_ONLN
/* get number of CPUs on the machine */
n_threads = sysconf(_SC_NPROCESSORS_ONLN); n_threads = sysconf(_SC_NPROCESSORS_ONLN);
#endif #endif
@ -380,5 +461,8 @@ void httpd(void)
/* wait for the background thread to end */ /* wait for the background thread to end */
pthread_join(threads[0], NULL); pthread_join(threads[0], NULL);
job_sockets = xs_free(job_sockets);
job_qitems = xs_free(job_qitems);
srv_log(xs_fmt("httpd stop %s:%d", address, port)); srv_log(xs_fmt("httpd stop %s:%d", address, port));
} }