Skip to content

Commit

Permalink
autoclean: call list in easy stages.
Browse files Browse the repository at this point in the history
listforwards on a large node can easily run out of memory.  Sip, don't
gulp!

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
  • Loading branch information
rustyrussell committed May 9, 2024
1 parent 787bc87 commit 2a8065e
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 13 deletions.
107 changes: 94 additions & 13 deletions plugins/autoclean.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ static struct clean_info *timer_cinfo;
static struct plugin *plugin;
/* This is NULL if it's running now. */
static struct plugin_timer *cleantimer;
static u64 max_entries_per_call = 10000;

enum subsystem_type {
FORWARDS,
Expand All @@ -34,8 +35,8 @@ struct subsystem_ops {
/* "success" and "failure" names for JSON formatting. */
const char *names[NUM_SUBSYSTEM_VARIANTS];

/* name of "list" command */
const char *list_command;
/* name of system for wait and "list" */
const char *system_name;

/* Name of array inside "list" command return */
const char *arr_name;
Expand Down Expand Up @@ -89,23 +90,23 @@ static void add_forward_del_fields(struct out_req *req,

static const struct subsystem_ops subsystem_ops[NUM_SUBSYSTEM_TYPES] = {
{ {"succeededforwards", "failedforwards"},
"listforwards",
"forwards",
"forwards",
"\"in_channel\":true,\"in_htlc_id\":true,\"resolved_time\":true,\"received_time\":true,\"status\":true",
"delforward",
get_listforwards_variant,
add_forward_del_fields,
},
{ {"succeededpays", "failedpays"},
"listsendpays",
"sendpays",
"payments",
"\"created_at\":true,\"status\":true,\"payment_hash\":true,\"groupid\":true,\"partid\":true",
"delpay",
get_listsendpays_variant,
add_sendpays_del_fields,
},
{ {"paidinvoices", "expiredinvoices"},
"listinvoices",
"invoices",
"invoices",
"\"label\":true,\"status\":true,\"expires_at\":true,\"paid_at\":true",
"delinvoice",
Expand Down Expand Up @@ -170,6 +171,10 @@ struct per_subsystem {
struct clean_info *cinfo;
enum subsystem_type type;

/* How far are we through the listing? */
u64 offset, max;

/* How many did we ignore? */
u64 num_uncleaned;
struct per_variant variants[NUM_SUBSYSTEM_VARIANTS];
};
Expand Down Expand Up @@ -202,6 +207,7 @@ static const struct subsystem_ops *get_subsystem_ops(const struct per_subsystem

/* Mutual recursion */
static void do_clean_timer(void *unused);
static struct command_result *do_clean(struct clean_info *cinfo);

static struct clean_info *new_clean_info(const tal_t *ctx,
struct command *cmd)
Expand Down Expand Up @@ -295,7 +301,8 @@ static struct command_result *clean_finished_one(struct clean_info *cinfo)
if (--cinfo->cleanup_reqs_remaining > 0)
return command_still_pending(cinfo->cmd);

return clean_finished(cinfo);
/* See if there are more entries we need to list. */
return do_clean(cinfo);
}

static struct command_result *del_done(struct command *cmd,
Expand Down Expand Up @@ -509,6 +516,7 @@ static struct command_result *list_done(struct command *cmd,
send_outreq(plugin, req);
}

subsystem->offset += max_entries_per_call;
return clean_finished_one(subsystem->cinfo);
}

Expand All @@ -517,8 +525,8 @@ static struct command_result *list_failed(struct command *cmd,
const jsmntok_t *result,
struct per_subsystem *subsystem)
{
plugin_err(plugin, "Failed '%s': '%.*s'",
get_subsystem_ops(subsystem)->list_command,
plugin_err(plugin, "Failed 'list%s': '%.*s'",
get_subsystem_ops(subsystem)->system_name,
json_tok_full_len(result),
json_tok_full(buf, result));
}
Expand All @@ -533,9 +541,7 @@ static struct command_result *do_clean(struct clean_info *cinfo)
const char *filter;
const struct subsystem_ops *ops = get_subsystem_ops(ps);

ps->num_uncleaned = 0;
for (size_t j = 0; j < NUM_SUBSYSTEM_VARIANTS; j++) {
ps->variants[j].num_cleaned = 0;
if (ps->variants[j].age)
have_variant = true;
}
Expand All @@ -544,13 +550,24 @@ static struct command_result *do_clean(struct clean_info *cinfo)
if (!have_variant)
continue;

/* Don't bother if we're past the end already. */
if (ps->offset >= ps->max)
continue;

filter = tal_fmt(tmpctx, "{\"%s\":[{%s}]}",
ops->arr_name, ops->list_filter);
req = jsonrpc_request_with_filter_start(plugin, NULL,
ops->list_command,
tal_fmt(tmpctx,
"list%s",
ops->system_name),
filter,
list_done, list_failed,
ps);
/* Don't overwhelm lightningd or us if there are millions of
* entries! */
json_add_string(req->js, "index", "created");
json_add_u64(req->js, "start", ps->offset);
json_add_u64(req->js, "limit", max_entries_per_call);
send_outreq(plugin, req);
cinfo->cleanup_reqs_remaining++;
}
Expand All @@ -560,12 +577,71 @@ static struct command_result *do_clean(struct clean_info *cinfo)
return clean_finished(cinfo);
}

static struct command_result *wait_done(struct command *cmd,
const char *buf,
const jsmntok_t *result,
struct per_subsystem *ps)
{
const char *err;

err = json_scan(tmpctx, buf, result, "{created:%}",
JSON_SCAN(json_to_u64, &ps->max));
if (err)
plugin_err(plugin, "Failed parsing wait response: (%s): '%.*s'",
err,
json_tok_full_len(result),
json_tok_full(buf, result));

/* We do three of these, make sure they're all complete. */
assert(ps->cinfo->cleanup_reqs_remaining != 0);
if (--ps->cinfo->cleanup_reqs_remaining > 0)
return command_still_pending(ps->cinfo->cmd);

return do_clean(ps->cinfo);
}

static struct command_result *wait_failed(struct command *cmd,
const char *buf,
const jsmntok_t *result,
struct per_subsystem *subsystem)
{
plugin_err(plugin, "Failed wait '%s': '%.*s'",
get_subsystem_ops(subsystem)->system_name,
json_tok_full_len(result),
json_tok_full(buf, result));
}

static struct command_result *start_clean(struct clean_info *cinfo)
{
cinfo->cleanup_reqs_remaining = 0;

/* We have to get max indexes first. */
for (size_t i = 0; i < NUM_SUBSYSTEM_TYPES; i++) {
struct per_subsystem *ps = &cinfo->per_subsystem[i];
const struct subsystem_ops *ops = get_subsystem_ops(ps);
struct out_req *req;

ps->offset = 0;

req = jsonrpc_request_start(plugin, NULL,
"wait",
wait_done, wait_failed, ps);
json_add_string(req->js, "subsystem", ops->system_name);
json_add_string(req->js, "indexname", "created");
json_add_u64(req->js, "nextvalue", 0);
send_outreq(plugin, req);
cinfo->cleanup_reqs_remaining++;
}

return command_still_pending(cinfo->cmd);
}

/* Needs a different signature than do_clean */
static void do_clean_timer(void *unused)
{
assert(timer_cinfo->cleanup_reqs_remaining == 0);
cleantimer = NULL;
do_clean(timer_cinfo);
start_clean(timer_cinfo);
}

static struct command_result *param_subsystem(struct command *cmd,
Expand Down Expand Up @@ -651,7 +727,7 @@ static struct command_result *json_autoclean_once(struct command *cmd,
cinfo = new_clean_info(cmd, cmd);
get_per_variant(cinfo, sv)->age = *age;

return do_clean(cinfo);
return start_clean(cinfo);
}

static void memleak_mark_timer_cinfo(struct plugin *plugin,
Expand Down Expand Up @@ -760,5 +836,10 @@ int main(int argc, char *argv[])
"How old do expired invoices have to be before deletion (0 = never)",
u64_option,
&timer_cinfo->per_subsystem[INVOICES].variants[FAILURE].age),
plugin_option_dev_dynamic("dev-autoclean-max-batch",
"int",
"Maximum cleans to do at a time",
u64_option,
&max_entries_per_call),
NULL);
}
3 changes: 3 additions & 0 deletions plugins/libplugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,9 @@ bool plugin_developer_mode(const struct plugin *plugin);
#define plugin_option_dev(name, type, description, set, arg) \
plugin_option_((name), (type), (description), (set), (arg), true, NULL, NULL, false)

#define plugin_option_dev_dynamic(name, type, description, set, arg) \
plugin_option_((name), (type), (description), (set), (arg), true, NULL, NULL, true)

#define plugin_option_dynamic(name, type, description, set, arg) \
plugin_option_((name), (type), (description), (set), (arg), false, NULL, NULL, true)

Expand Down
32 changes: 32 additions & 0 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4317,3 +4317,35 @@ def test_plugin_startdir_lol(node_factory):
"""Though we fail to start many of them, we don't crash!"""
l1 = node_factory.get_node(broken_log='.*')
l1.rpc.plugin_startdir(os.path.join(os.getcwd(), 'tests/plugins'))


def test_autoclean_batch(node_factory):
l1 = node_factory.get_node(1)

# Many expired invoices
for i in range(100):
l1.rpc.invoice(amount_msat=12300, label=f'inv1{i}', description='description', expiry=1)

time.sleep(3)
l1.rpc.setconfig('dev-autoclean-max-batch', 2)

# Test manual clean
ret = l1.rpc.autoclean_once('expiredinvoices', 1)
assert ret == {'autoclean': {'expiredinvoices': {'cleaned': 100, 'uncleaned': 0}}}

for i in range(100):
l1.rpc.invoice(amount_msat=12300, label=f'inv2{i}', description='description', expiry=1)

time.sleep(3)

# Test cycle clean
assert (l1.rpc.autoclean_status('expiredinvoices')
== {'autoclean': {'expiredinvoices': {'enabled': False, 'cleaned': 100}}})

l1.rpc.setconfig('autoclean-expiredinvoices-age', 2)
assert (l1.rpc.autoclean_status('expiredinvoices')
== {'autoclean': {'expiredinvoices': {'enabled': True, 'cleaned': 100, 'age': 2}}})

l1.rpc.setconfig('autoclean-cycle', 5)
wait_for(lambda: l1.rpc.autoclean_status('expiredinvoices')
== {'autoclean': {'expiredinvoices': {'enabled': True, 'cleaned': 200, 'age': 2}}})

0 comments on commit 2a8065e

Please sign in to comment.