From 5036cb5e1134d99c967f0de5057801a9b0af96d9 Mon Sep 17 00:00:00 2001 From: default Date: Thu, 2 Mar 2023 12:38:02 +0100 Subject: [PATCH] Connection jobs are treated as urgent. --- activitypub.c | 2 +- data.c | 2 +- httpd.c | 16 ++++++++++------ snac.h | 2 +- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/activitypub.c b/activitypub.c index ee31ecd..9c47590 100644 --- a/activitypub.c +++ b/activitypub.c @@ -1316,7 +1316,7 @@ int process_queue(void) xs *q_item = dequeue(fn); if (q_item != NULL) { - job_post(q_item); + job_post(q_item, 0); cnt++; } } diff --git a/data.c b/data.c index 4403bf4..71ebf68 100644 --- a/data.c +++ b/data.c @@ -1497,7 +1497,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey, /* if it's to be sent right now, bypass the disk queue and post the job */ if (retries == 0 && job_fifo_ready()) - job_post(qmsg); + job_post(qmsg, 0); else { qmsg = _enqueue_put(fn, qmsg); srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); diff --git a/httpd.c b/httpd.c index 1d91b34..d32903e 100644 --- a/httpd.c +++ b/httpd.c @@ -262,7 +262,7 @@ int job_fifo_ready(void) } -void job_post(const xs_val *job) +void job_post(const xs_val *job, int urgent) /* posts a job for the threads to process it */ { if (job != NULL) { @@ -270,8 +270,12 @@ void job_post(const xs_val *job) pthread_mutex_lock(&job_mutex); /* add to the fifo */ - if (job_fifo != NULL) - job_fifo = xs_list_append(job_fifo, job); + if (job_fifo != NULL) { + if (urgent) + job_fifo = xs_list_insert(job_fifo, 0, job); + else + job_fifo = xs_list_append(job_fifo, job); + } /* unlock the mutex */ pthread_mutex_unlock(&job_mutex); @@ -386,7 +390,7 @@ static void *background_thread(void *arg) xs *q_item = xs_dict_new(); q_item = xs_dict_append(q_item, "type", "purge"); - job_post(q_item); + job_post(q_item, 0); } if (cnt == 0) { @@ -485,7 +489,7 @@ void httpd(void) if (f != NULL) { xs *job = xs_data_new(&f, sizeof(FILE *)); - job_post(job); + job_post(job, 1); } else break; @@ -496,7 +500,7 @@ void httpd(void) /* send as many empty jobs as working threads */ for (n = 1; n < n_threads; n++) - job_post(NULL); + job_post(NULL, 0); /* wait for all the threads to exit */ for (n = 0; n < n_threads; n++) diff --git a/snac.h b/snac.h index 9ea3619..6766c55 100644 --- a/snac.h +++ b/snac.h @@ -218,5 +218,5 @@ int adduser(const char *uid); int resetpwd(snac *snac); int job_fifo_ready(void); -void job_post(const xs_val *job); +void job_post(const xs_val *job, int urgent); void job_wait(xs_val **job);