diff --git a/activitypub.c b/activitypub.c index 5cc059d..9c9ea7c 100644 --- a/activitypub.c +++ b/activitypub.c @@ -839,7 +839,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg) body = xs_str_cat(body, s1); } - enqueue_email(snac, body, 0); + enqueue_email(body, 0); } @@ -1137,26 +1137,6 @@ void process_user_queue_item(snac *snac, xs_dict *q_item) } } } - else - if (strcmp(type, "email") == 0) { - /* send this email */ - xs_str *msg = xs_dict_get(q_item, "message"); - int retries = xs_number_get(xs_dict_get(q_item, "retries")); - - if (!send_email(msg)) - snac_debug(snac, 1, xs_fmt("email message sent")); - else { - if (retries > queue_retry_max) - snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno)); - else { - /* requeue */ - snac_log(snac, xs_fmt( - "process_queue email requeue #%d (errno: %d)", retries + 1, errno)); - - enqueue_email(snac, msg, retries + 1); - } - } - } } @@ -1184,6 +1164,30 @@ void process_user_queue(snac *snac) void process_queue_item(xs_dict *q_item) /* processes an item from the global queue */ { + char *type = xs_dict_get(q_item, "type"); + int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); + + if (strcmp(type, "email") == 0) { + /* send this email */ + xs_str *msg = xs_dict_get(q_item, "message"); + int retries = xs_number_get(xs_dict_get(q_item, "retries")); + + if (!send_email(msg)) + srv_debug(1, xs_fmt("email message sent")); + else { + retries++; + + if (retries > queue_retry_max) + srv_log(xs_fmt("process_queue email giving up (errno: %d)", errno)); + else { + /* requeue */ + srv_log(xs_fmt( + "process_queue email requeue #%d (errno: %d)", retries, errno)); + + enqueue_email(msg, retries); + } + } + } } diff --git a/data.c b/data.c index 96583aa..da8d422 100644 --- a/data.c +++ b/data.c @@ -1389,16 +1389,16 @@ void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retrie } -void enqueue_email(snac *snac, xs_str *msg, int retries) +void enqueue_email(xs_str *msg, int retries) /* enqueues an email message to be sent */ { xs *qmsg = _new_qmsg("email", msg, retries); char *ntid = xs_dict_get(qmsg, "ntid"); - xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); + xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid); qmsg = _enqueue_put(fn, qmsg); - snac_debug(snac, 1, xs_fmt("enqueue_email %d", retries)); + srv_debug(1, xs_fmt("enqueue_email %d", retries)); } diff --git a/httpd.c b/httpd.c index c47b841..7932982 100644 --- a/httpd.c +++ b/httpd.c @@ -280,6 +280,9 @@ static void *queue_thread(void *arg) } } + /* global queue */ + process_queue(); + /* time to purge? */ if ((t = time(NULL)) > purge_time) { pthread_t pth; diff --git a/snac.h b/snac.h index 55ed49d..78007f7 100644 --- a/snac.h +++ b/snac.h @@ -126,10 +126,11 @@ d_char *history_list(snac *snac); void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); -void enqueue_email(snac *snac, xs_str *msg, int retries); +void enqueue_email(xs_str *msg, int retries); void enqueue_message(snac *snac, char *msg); xs_list *user_queue(snac *snac); +xs_list *queue(void); xs_dict *dequeue(const char *fn); void purge(snac *snac); @@ -165,7 +166,11 @@ int send_to_inbox(snac *snac, char *inbox, char *msg, d_char **payload, int *p_s d_char *get_actor_inbox(snac *snac, char *actor); int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size, int timeout); int is_msg_public(snac *snac, char *msg); + void process_user_queue(snac *snac); + +void process_queue(void); + void post(snac *snac, char *msg); int activitypub_get_handler(d_char *req, char *q_path, char **body, int *b_size, char **ctype);