From 4cca157641d5f91bde51baf437a3179e39d0b601 Mon Sep 17 00:00:00 2001 From: default Date: Tue, 7 Feb 2023 13:31:48 +0100 Subject: [PATCH] Output messages are now processed by the pool of threads. --- activitypub.c | 38 ++++++++++++++++++++++++++++++++++++++ data.c | 30 ++++++++++++++++++++---------- snac.h | 2 ++ 3 files changed, 60 insertions(+), 10 deletions(-) diff --git a/activitypub.c b/activitypub.c index af0a9a1..b54845d 100644 --- a/activitypub.c +++ b/activitypub.c @@ -1195,6 +1195,44 @@ void process_queue_item(xs_dict *q_item) char *type = xs_dict_get(q_item, "type"); int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); + if (strcmp(type, "output") == 0) { + int status; + xs_str *inbox = xs_dict_get(q_item, "inbox"); + xs_str *keyid = xs_dict_get(q_item, "keyid"); + xs_str *seckey = xs_dict_get(q_item, "seckey"); + xs_dict *msg = xs_dict_get(q_item, "message"); + int retries = xs_number_get(xs_dict_get(q_item, "retries")); + xs *payload = NULL; + int p_size = 0; + + if (xs_is_null(inbox) || xs_is_null(msg) || xs_is_null(keyid) || xs_is_null(seckey)) { + srv_log(xs_fmt("output message error: missing fields")); + return; + } + + /* deliver */ + status = send_to_inbox_raw(keyid, seckey, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); + + srv_log(xs_fmt("output message: sent to inbox %s %d", inbox, status)); + + if (!valid_status(status)) { + retries++; + + /* error sending; requeue? */ + if (status == 404 || status == 410) + /* explicit error: discard */ + srv_log(xs_fmt("output message: fatal error %s %d", inbox, status)); + else + if (retries > queue_retry_max) + srv_log(xs_fmt("output message: giving up %s %d", inbox, status)); + else { + /* requeue */ + enqueue_output_raw(keyid, seckey, msg, inbox, retries); + srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries)); + } + } + } + else if (strcmp(type, "email") == 0) { /* send this email */ xs_str *msg = xs_dict_get(q_item, "message"); diff --git a/data.c b/data.c index 3827422..ac429fa 100644 --- a/data.c +++ b/data.c @@ -1373,6 +1373,24 @@ void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries) } +void enqueue_output_raw(const char *keyid, const char *seckey, + xs_dict *msg, xs_str *inbox, int retries) +/* enqueues an output message to an inbox */ +{ + xs *qmsg = _new_qmsg("output", msg, retries); + char *ntid = xs_dict_get(qmsg, "ntid"); + xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid); + + qmsg = xs_dict_append(qmsg, "inbox", inbox); + qmsg = xs_dict_append(qmsg, "keyid", keyid); + qmsg = xs_dict_append(qmsg, "seckey", seckey); + + qmsg = _enqueue_put(fn, qmsg); + + srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); +} + + void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) /* enqueues an output message to an inbox */ { @@ -1381,17 +1399,9 @@ void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) return; } - xs *qmsg = _new_qmsg("output", msg, retries); - char *ntid = xs_dict_get(qmsg, "ntid"); - xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); + char *seckey = xs_dict_get(snac->key, "secret"); - qmsg = xs_dict_append(qmsg, "inbox", inbox); - qmsg = xs_dict_append(qmsg, "keyid", snac->actor); - qmsg = xs_dict_append(qmsg, "seckey", xs_dict_get(snac->key, "secret")); - - qmsg = _enqueue_put(fn, qmsg); - - snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); + enqueue_output_raw(snac->actor, seckey, msg, inbox, retries); } diff --git a/snac.h b/snac.h index e3ab4a8..3894908 100644 --- a/snac.h +++ b/snac.h @@ -129,6 +129,8 @@ int history_del(snac *snac, char *id); d_char *history_list(snac *snac); void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); +void enqueue_output_raw(const char *keyid, const char *seckey, + xs_dict *msg, xs_str *inbox, int retries); void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); void enqueue_email(xs_str *msg, int retries);