From 888a79e58a2de457558c41cc91cf7e8d5df78811 Mon Sep 17 00:00:00 2001 From: default Date: Mon, 11 Dec 2023 17:59:48 +0100 Subject: [PATCH] Call process_input_message() from the shared-inbox input. This way, some garbage like unrequested Deletes from Mastodon and other transient errors (like unaccessible authors) can be short-circuited before propagating the message to the users. --- activitypub.c | 83 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 31 deletions(-) diff --git a/activitypub.c b/activitypub.c index 55a245e..d7f5b37 100644 --- a/activitypub.c +++ b/activitypub.c @@ -2052,45 +2052,66 @@ void process_queue_item(xs_dict *q_item) } else if (strcmp(type, "input") == 0) { - /* redistribute the input message to all users */ - char *ntid = xs_dict_get(q_item, "ntid"); - xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid); xs_dict *msg = xs_dict_get(q_item, "message"); - FILE *f; + xs_dict *req = xs_dict_get(q_item, "req"); + int retries = xs_number_get(xs_dict_get(q_item, "retries")); - if ((f = fopen(tmpfn, "w")) != NULL) { - xs_json_dump(q_item, 4, f); - fclose(f); - } + /* do some instance-level checks */ + int r = process_input_message(NULL, msg, req); - xs *users = user_list(); - xs_list *p = users; - char *v; - int cnt = 0; + if (r == 0) { + /* transient error? retry */ + int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); - while (xs_list_iter(&p, &v)) { - snac user; - - if (user_open(&user, v)) { - if (is_msg_for_me(&user, msg)) { - xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid); - - snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn)); - - if (link(tmpfn, fn) < 0) - srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn)); - - cnt++; - } - - user_free(&user); + if (retries > queue_retry_max) + srv_log(xs_fmt("shared input giving up")); + else { + /* reenqueue */ + enqueue_shared_input(msg, req, retries + 1); + srv_log(xs_fmt("shared input requeue #%d", retries + 1)); } } + else + if (r == 2) { + /* redistribute the input message to all users */ + char *ntid = xs_dict_get(q_item, "ntid"); + xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid); + FILE *f; - unlink(tmpfn); + if ((f = fopen(tmpfn, "w")) != NULL) { + xs_json_dump(q_item, 4, f); + fclose(f); + } - if (cnt == 0) - srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn)); + xs *users = user_list(); + xs_list *p = users; + char *v; + int cnt = 0; + + while (xs_list_iter(&p, &v)) { + snac user; + + if (user_open(&user, v)) { + if (is_msg_for_me(&user, msg)) { + xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid); + + snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn)); + + if (link(tmpfn, fn) < 0) + srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn)); + + cnt++; + } + + user_free(&user); + } + } + + unlink(tmpfn); + + if (cnt == 0) + srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn)); + } } else srv_log(xs_fmt("unexpected q_item type '%s'", type));