mirror of
https://codeberg.org/grunfink/snac2.git
synced 2024-11-22 13:25:04 +00:00
Simplified job_post() and job_wait().
This commit is contained in:
parent
6bcc6bfa1c
commit
06fc40e1cd
1 changed files with 13 additions and 42 deletions
55
httpd.c
55
httpd.c
|
@ -328,27 +328,18 @@ static pthread_mutex_t job_mutex;
|
||||||
/* semaphre to trigger job processing */
|
/* semaphre to trigger job processing */
|
||||||
static sem_t job_sem;
|
static sem_t job_sem;
|
||||||
|
|
||||||
/* list of input sockets */
|
/* fifo of jobs */
|
||||||
xs_list *job_sockets = NULL;
|
xs_list *job_fifo = NULL;
|
||||||
|
|
||||||
/* list of queue items */
|
|
||||||
xs_list *job_qitems = NULL;
|
|
||||||
|
|
||||||
|
|
||||||
void job_post(FILE *socket, const xs_dict *q_item)
|
void job_post(const xs_val *job)
|
||||||
/* posts a job, being an input connection or another queue item */
|
/* posts a job for the threads to process it */
|
||||||
{
|
{
|
||||||
/* lock the mutex */
|
/* lock the mutex */
|
||||||
pthread_mutex_lock(&job_mutex);
|
pthread_mutex_lock(&job_mutex);
|
||||||
|
|
||||||
/* add to the appropriate fifo */
|
/* add to the fifo */
|
||||||
if (socket != NULL) {
|
job_fifo = xs_list_append(job_fifo, job);
|
||||||
xs *d = xs_data_new(&socket, sizeof(FILE *));
|
|
||||||
job_sockets = xs_list_append(job_sockets, d);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
if (q_item != NULL)
|
|
||||||
job_qitems = xs_list_append(job_qitems, q_item);
|
|
||||||
|
|
||||||
/* unlock the mutex */
|
/* unlock the mutex */
|
||||||
pthread_mutex_unlock(&job_mutex);
|
pthread_mutex_unlock(&job_mutex);
|
||||||
|
@ -358,39 +349,21 @@ void job_post(FILE *socket, const xs_dict *q_item)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int job_wait(FILE **socket, xs_dict **q_item)
|
void job_wait(xs_val **job)
|
||||||
/* waits for an available job; returns 0 if nothing left to do */
|
/* waits for an available job */
|
||||||
{
|
{
|
||||||
int done = 1;
|
*job = NULL;
|
||||||
|
|
||||||
*socket = NULL;
|
|
||||||
*q_item = NULL;
|
|
||||||
|
|
||||||
if (sem_wait(&job_sem) == 0) {
|
if (sem_wait(&job_sem) == 0) {
|
||||||
/* lock the mutex */
|
/* lock the mutex */
|
||||||
pthread_mutex_lock(&job_mutex);
|
pthread_mutex_lock(&job_mutex);
|
||||||
|
|
||||||
/* try first to get a socket to process */
|
/* dequeue */
|
||||||
xs *job_socket = NULL;
|
job_fifo = xs_list_shift(job_fifo, job);
|
||||||
job_sockets = xs_list_shift(job_sockets, &job_socket);
|
|
||||||
|
|
||||||
/* if empty, try a q_item */
|
|
||||||
if (job_socket == NULL)
|
|
||||||
job_qitems = xs_list_shift(job_qitems, q_item);
|
|
||||||
|
|
||||||
/* unlock the mutex */
|
/* unlock the mutex */
|
||||||
pthread_mutex_unlock(&job_mutex);
|
pthread_mutex_unlock(&job_mutex);
|
||||||
|
|
||||||
if (job_socket != NULL) {
|
|
||||||
xs_data_get(job_socket, socket);
|
|
||||||
done = 0;
|
|
||||||
}
|
}
|
||||||
else
|
|
||||||
if (*q_item != NULL)
|
|
||||||
done = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return done;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -426,8 +399,7 @@ void httpd(void)
|
||||||
/* initialize the job control engine */
|
/* initialize the job control engine */
|
||||||
pthread_mutex_init(&job_mutex, NULL);
|
pthread_mutex_init(&job_mutex, NULL);
|
||||||
sem_init(&job_sem, 0, 0);
|
sem_init(&job_sem, 0, 0);
|
||||||
job_sockets = xs_list_new();
|
job_fifo = xs_list_new();
|
||||||
job_qitems = xs_list_new();
|
|
||||||
|
|
||||||
#ifdef _SC_NPROCESSORS_ONLN
|
#ifdef _SC_NPROCESSORS_ONLN
|
||||||
/* get number of CPUs on the machine */
|
/* get number of CPUs on the machine */
|
||||||
|
@ -461,8 +433,7 @@ void httpd(void)
|
||||||
/* wait for the background thread to end */
|
/* wait for the background thread to end */
|
||||||
pthread_join(threads[0], NULL);
|
pthread_join(threads[0], NULL);
|
||||||
|
|
||||||
job_sockets = xs_free(job_sockets);
|
job_fifo = xs_free(job_fifo);
|
||||||
job_qitems = xs_free(job_qitems);
|
|
||||||
|
|
||||||
srv_log(xs_fmt("httpd stop %s:%d", address, port));
|
srv_log(xs_fmt("httpd stop %s:%d", address, port));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue