From c2524323a932d383a10c0e3e3adbffdada085515 Mon Sep 17 00:00:00 2001 From: default Date: Tue, 31 Jan 2023 22:30:34 +0100 Subject: [PATCH] New function process_queue_item(). --- activitypub.c | 192 ++++++++++++++++++++++++++------------------------ 1 file changed, 100 insertions(+), 92 deletions(-) diff --git a/activitypub.c b/activitypub.c index 9bffb5b..ba6e45e 100644 --- a/activitypub.c +++ b/activitypub.c @@ -845,7 +845,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg) /** queues **/ -int process_message(snac *snac, char *msg, char *req) +int process_input_message(snac *snac, char *msg, char *req) /* processes an ActivityPub message from the input queue */ { /* actor and type exist, were checked previously */ @@ -1065,110 +1065,118 @@ int send_email(char *msg) } +void process_queue_item(snac *snac, xs_dict *q_item) +/* processes an item from the queue */ +{ + char *type; + int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); + + if ((type = xs_dict_get(q_item, "type")) == NULL) + type = "output"; + + if (strcmp(type, "message") == 0) { + xs_dict *msg = xs_dict_get(q_item, "message"); + xs *inboxes = inbox_list(snac, msg); + xs_list *p; + xs_str *inbox; + + p = inboxes; + while (xs_list_iter(&p, &inbox)) { + enqueue_output(snac, msg, inbox, 0); + } + } + else + if (strcmp(type, "output") == 0) { + int status; + xs_str *inbox = xs_dict_get(q_item, "inbox"); + xs_dict *msg = xs_dict_get(q_item, "message"); + int retries = xs_number_get(xs_dict_get(q_item, "retries")); + xs *payload = NULL; + int p_size = 0; + + if (xs_is_null(inbox) || xs_is_null(msg)) + return; + + /* deliver */ + status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); + + snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status)); + + if (!valid_status(status)) { + /* error sending; requeue? */ + if (status == 404 || status == 410) + /* explicit error: discard */ + snac_log(snac, xs_fmt("process_queue error %s %d", inbox, status)); + else + if (retries > queue_retry_max) + snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status)); + else { + /* requeue */ + enqueue_output(snac, msg, inbox, retries + 1); + snac_log(snac, xs_fmt("process_queue requeue %s #%d", inbox, retries + 1)); + } + } + } + else + if (strcmp(type, "input") == 0) { + /* process the message */ + xs_dict *msg = xs_dict_get(q_item, "message"); + xs_dict *req = xs_dict_get(q_item, "req"); + int retries = xs_number_get(xs_dict_get(q_item, "retries")); + + if (xs_is_null(msg)) + return; + + if (!process_input_message(snac, msg, req)) { + if (retries > queue_retry_max) + snac_log(snac, xs_fmt("process_queue input giving up")); + else { + /* reenqueue */ + enqueue_input(snac, msg, req, retries + 1); + snac_log(snac, xs_fmt("process_queue input requeue #%d", retries + 1)); + } + } + } + else + if (strcmp(type, "email") == 0) { + /* send this email */ + xs_str *msg = xs_dict_get(q_item, "message"); + int retries = xs_number_get(xs_dict_get(q_item, "retries")); + + if (!send_email(msg)) + snac_debug(snac, 1, xs_fmt("email message sent")); + else { + if (retries > queue_retry_max) + snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno)); + else { + /* requeue */ + snac_log(snac, xs_fmt( + "process_queue email requeue #%d (errno: %d)", retries + 1, errno)); + + enqueue_email(snac, msg, retries + 1); + } + } + } +} + + void process_queue(snac *snac) /* processes the queue */ { - xs *list; - char *p, *fn; - int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); + xs *list = queue(snac); - list = queue(snac); + xs_list *p = list; + xs_str *fn; - p = list; while (xs_list_iter(&p, &fn)) { xs *q_item = dequeue(snac, fn); - char *type; if (q_item == NULL) { snac_log(snac, xs_fmt("process_queue q_item error")); continue; } - if ((type = xs_dict_get(q_item, "type")) == NULL) - type = "output"; - - if (strcmp(type, "message") == 0) { - char *msg = xs_dict_get(q_item, "message"); - xs *inboxes = inbox_list(snac, msg); - char *p, *v; - - p = inboxes; - while (xs_list_iter(&p, &v)) { - enqueue_output(snac, msg, v, 0); - } - } - else - if (strcmp(type, "output") == 0) { - int status; - char *inbox = xs_dict_get(q_item, "inbox"); - char *msg = xs_dict_get(q_item, "message"); - int retries = xs_number_get(xs_dict_get(q_item, "retries")); - xs *payload = NULL; - int p_size = 0; - - if (xs_is_null(inbox) || xs_is_null(msg)) - continue; - - /* deliver */ - status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); - - snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status)); - - if (!valid_status(status)) { - /* error sending; requeue? */ - if (status == 404 || status == 410) - /* explicit error: discard */ - snac_log(snac, xs_fmt("process_queue error %s %d", inbox, status)); - else - if (retries > queue_retry_max) - snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status)); - else { - /* requeue */ - enqueue_output(snac, msg, inbox, retries + 1); - snac_log(snac, xs_fmt("process_queue requeue %s #%d", inbox, retries + 1)); - } - } - } - else - if (strcmp(type, "input") == 0) { - /* process the message */ - char *msg = xs_dict_get(q_item, "message"); - char *req = xs_dict_get(q_item, "req"); - int retries = xs_number_get(xs_dict_get(q_item, "retries")); - - if (xs_is_null(msg)) - continue; - - if (!process_message(snac, msg, req)) { - if (retries > queue_retry_max) - snac_log(snac, xs_fmt("process_queue input giving up")); - else { - /* reenqueue */ - enqueue_input(snac, msg, req, retries + 1); - snac_log(snac, xs_fmt("process_queue input requeue #%d", retries + 1)); - } - } - } - else - if (strcmp(type, "email") == 0) { - /* send this email */ - char *msg = xs_dict_get(q_item, "message"); - int retries = xs_number_get(xs_dict_get(q_item, "retries")); - - if (!send_email(msg)) - snac_debug(snac, 1, xs_fmt("email message sent")); - else { - if (retries > queue_retry_max) - snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno)); - else { - /* requeue */ - snac_log(snac, xs_fmt( - "process_queue email requeue #%d (errno: %d)", retries + 1, errno)); - - enqueue_email(snac, msg, retries + 1); - } - } - } + process_queue_item(snac, q_item); } }