diff --git a/activitypub.c b/activitypub.c index a8a570c..514bf5d 100644 --- a/activitypub.c +++ b/activitypub.c @@ -1798,7 +1798,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item) if (inbox != NULL) { /* add to the set and, if it's not there, send message */ if (xs_set_add(&inboxes, inbox) == 1) - enqueue_output(snac, msg, inbox, 0); + enqueue_output(snac, msg, inbox, 0, 0); } else snac_log(snac, xs_fmt("cannot find inbox for %s", actor)); @@ -1812,7 +1812,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item) p = shibx; while (xs_list_iter(&p, &inbox)) { if (xs_set_add(&inboxes, inbox) == 1) - enqueue_output(snac, msg, inbox, 0); + enqueue_output(snac, msg, inbox, 0, 0); } } @@ -1896,6 +1896,7 @@ void process_queue_item(xs_dict *q_item) xs_str *seckey = xs_dict_get(q_item, "seckey"); xs_dict *msg = xs_dict_get(q_item, "message"); int retries = xs_number_get(xs_dict_get(q_item, "retries")); + int p_status = xs_number_get(xs_dict_get(q_item, "p_status")); xs *payload = NULL; int p_size = 0; @@ -1909,8 +1910,9 @@ void process_queue_item(xs_dict *q_item) return; } - /* deliver */ - status = send_to_inbox_raw(keyid, seckey, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); + /* deliver (if previous error status was a timeout, try now longer) */ + status = send_to_inbox_raw(keyid, seckey, inbox, msg, + &payload, &p_size, p_status == 599 ? 20 : 3); if (payload) { if (p_size > 64) { @@ -1934,6 +1936,11 @@ void process_queue_item(xs_dict *q_item) if (!valid_status(status)) { retries++; + /* if it's not the first time it fails with a timeout, + penalize the server by skipping one retry */ + if (p_status == status && status == 499) + retries++; + /* error sending; requeue? */ if (status == 400 || status == 404 || status == 410 || status < 0) /* explicit error: discard */ @@ -1943,7 +1950,7 @@ void process_queue_item(xs_dict *q_item) srv_log(xs_fmt("output message: giving up %s %d", inbox, status)); else { /* requeue */ - enqueue_output_raw(keyid, seckey, msg, inbox, retries); + enqueue_output_raw(keyid, seckey, msg, inbox, retries, status); srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries)); } } diff --git a/data.c b/data.c index 3b22114..cf4d39b 100644 --- a/data.c +++ b/data.c @@ -2041,13 +2041,16 @@ void enqueue_input(snac *snac, const xs_dict *msg, const xs_dict *req, int retri void enqueue_output_raw(const char *keyid, const char *seckey, - xs_dict *msg, xs_str *inbox, int retries) + xs_dict *msg, xs_str *inbox, int retries, int p_status) /* enqueues an output message to an inbox */ { xs *qmsg = _new_qmsg("output", msg, retries); char *ntid = xs_dict_get(qmsg, "ntid"); xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid); + xs *ns = xs_number_new(p_status); + qmsg = xs_dict_append(qmsg, "p_status", ns); + qmsg = xs_dict_append(qmsg, "inbox", inbox); qmsg = xs_dict_append(qmsg, "keyid", keyid); qmsg = xs_dict_append(qmsg, "seckey", seckey); @@ -2062,7 +2065,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey, } -void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) +void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries, int p_status) /* enqueues an output message to an inbox */ { if (xs_startswith(inbox, snac->actor)) { @@ -2072,7 +2075,7 @@ void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) char *seckey = xs_dict_get(snac->key, "secret"); - enqueue_output_raw(snac->actor, seckey, msg, inbox, retries); + enqueue_output_raw(snac->actor, seckey, msg, inbox, retries, p_status); } @@ -2082,7 +2085,7 @@ void enqueue_output_by_actor(snac *snac, xs_dict *msg, const xs_str *actor, int xs *inbox = get_actor_inbox(snac, actor); if (!xs_is_null(inbox)) - enqueue_output(snac, msg, inbox, retries); + enqueue_output(snac, msg, inbox, retries, 0); else snac_log(snac, xs_fmt("enqueue_output_by_actor cannot get inbox %s", actor)); } diff --git a/snac.h b/snac.h index 9800bb5..7cf3d7e 100644 --- a/snac.h +++ b/snac.h @@ -178,8 +178,8 @@ int instance_unblock(const char *instance); void enqueue_input(snac *snac, const xs_dict *msg, const xs_dict *req, int retries); void enqueue_output_raw(const char *keyid, const char *seckey, - xs_dict *msg, xs_str *inbox, int retries); -void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); + xs_dict *msg, xs_str *inbox, int retries, int p_status); +void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries, int p_status); void enqueue_output_by_actor(snac *snac, xs_dict *msg, const xs_str *actor, int retries); void enqueue_email(xs_str *msg, int retries); void enqueue_telegram(const xs_str *msg, const char *bot, const char *chat_id);