Give more retry time to timedout connections.

This commit is contained in:
default 2023-09-29 10:34:22 +02:00
parent 9cce2ee119
commit 60e8953a69
3 changed files with 21 additions and 11 deletions

View File

@ -1798,7 +1798,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
if (inbox != NULL) {
/* add to the set and, if it's not there, send message */
if (xs_set_add(&inboxes, inbox) == 1)
enqueue_output(snac, msg, inbox, 0);
enqueue_output(snac, msg, inbox, 0, 0);
}
else
snac_log(snac, xs_fmt("cannot find inbox for %s", actor));
@ -1812,7 +1812,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
p = shibx;
while (xs_list_iter(&p, &inbox)) {
if (xs_set_add(&inboxes, inbox) == 1)
enqueue_output(snac, msg, inbox, 0);
enqueue_output(snac, msg, inbox, 0, 0);
}
}
@ -1896,6 +1896,7 @@ void process_queue_item(xs_dict *q_item)
xs_str *seckey = xs_dict_get(q_item, "seckey");
xs_dict *msg = xs_dict_get(q_item, "message");
int retries = xs_number_get(xs_dict_get(q_item, "retries"));
int p_status = xs_number_get(xs_dict_get(q_item, "p_status"));
xs *payload = NULL;
int p_size = 0;
@ -1909,8 +1910,9 @@ void process_queue_item(xs_dict *q_item)
return;
}
/* deliver */
status = send_to_inbox_raw(keyid, seckey, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8);
/* deliver (if previous error status was a timeout, try now longer) */
status = send_to_inbox_raw(keyid, seckey, inbox, msg,
&payload, &p_size, p_status == 599 ? 20 : 3);
if (payload) {
if (p_size > 64) {
@ -1934,6 +1936,11 @@ void process_queue_item(xs_dict *q_item)
if (!valid_status(status)) {
retries++;
/* if it's not the first time it fails with a timeout,
penalize the server by skipping one retry */
if (p_status == status && status == 499)
retries++;
/* error sending; requeue? */
if (status == 400 || status == 404 || status == 410 || status < 0)
/* explicit error: discard */
@ -1943,7 +1950,7 @@ void process_queue_item(xs_dict *q_item)
srv_log(xs_fmt("output message: giving up %s %d", inbox, status));
else {
/* requeue */
enqueue_output_raw(keyid, seckey, msg, inbox, retries);
enqueue_output_raw(keyid, seckey, msg, inbox, retries, status);
srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries));
}
}

11
data.c
View File

@ -2041,13 +2041,16 @@ void enqueue_input(snac *snac, const xs_dict *msg, const xs_dict *req, int retri
void enqueue_output_raw(const char *keyid, const char *seckey,
xs_dict *msg, xs_str *inbox, int retries)
xs_dict *msg, xs_str *inbox, int retries, int p_status)
/* enqueues an output message to an inbox */
{
xs *qmsg = _new_qmsg("output", msg, retries);
char *ntid = xs_dict_get(qmsg, "ntid");
xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid);
xs *ns = xs_number_new(p_status);
qmsg = xs_dict_append(qmsg, "p_status", ns);
qmsg = xs_dict_append(qmsg, "inbox", inbox);
qmsg = xs_dict_append(qmsg, "keyid", keyid);
qmsg = xs_dict_append(qmsg, "seckey", seckey);
@ -2062,7 +2065,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey,
}
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries)
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries, int p_status)
/* enqueues an output message to an inbox */
{
if (xs_startswith(inbox, snac->actor)) {
@ -2072,7 +2075,7 @@ void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries)
char *seckey = xs_dict_get(snac->key, "secret");
enqueue_output_raw(snac->actor, seckey, msg, inbox, retries);
enqueue_output_raw(snac->actor, seckey, msg, inbox, retries, p_status);
}
@ -2082,7 +2085,7 @@ void enqueue_output_by_actor(snac *snac, xs_dict *msg, const xs_str *actor, int
xs *inbox = get_actor_inbox(snac, actor);
if (!xs_is_null(inbox))
enqueue_output(snac, msg, inbox, retries);
enqueue_output(snac, msg, inbox, retries, 0);
else
snac_log(snac, xs_fmt("enqueue_output_by_actor cannot get inbox %s", actor));
}

4
snac.h
View File

@ -178,8 +178,8 @@ int instance_unblock(const char *instance);
void enqueue_input(snac *snac, const xs_dict *msg, const xs_dict *req, int retries);
void enqueue_output_raw(const char *keyid, const char *seckey,
xs_dict *msg, xs_str *inbox, int retries);
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries);
xs_dict *msg, xs_str *inbox, int retries, int p_status);
void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries, int p_status);
void enqueue_output_by_actor(snac *snac, xs_dict *msg, const xs_str *actor, int retries);
void enqueue_email(xs_str *msg, int retries);
void enqueue_telegram(const xs_str *msg, const char *bot, const char *chat_id);