Skip to content

Commit

Permalink
lightningd/plugin_hook.c: Make db_write a chained hook.
Browse files Browse the repository at this point in the history
Fixes: #4219

Changelog-Changed: Plugins: Multiple plugins can now register `db_write` hooks.
  • Loading branch information
ZmnSCPxj committed Nov 23, 2020
1 parent ee0c13e commit 426df0d
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 27 deletions.
8 changes: 8 additions & 0 deletions doc/PLUGINS.md
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,14 @@ to error without
committing to the database!
This is the expected way to halt and catch fire.

`db_write` is a parallel-chained hook, i.e., multiple plugins can
register it, and all of them will be invoked simultaneously without
regard for order of registration.
The hook is considered handled if all registered plugins return
`{"result": "continue"}`.
If any plugin returns anything else, `lightningd` will error without
committing to the database.

### `invoice_payment`

This hook is called whenever a valid payment for an unpaid invoice has arrived.
Expand Down
85 changes: 58 additions & 27 deletions lightningd/plugin_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,34 +294,51 @@ bool plugin_hook_call_(struct lightningd *ld, const struct plugin_hook *hook,
* annoying, and to make it clear that it's totally synchronous. */

/* Special synchronous hook for db */
static struct plugin_hook db_write_hook = {"db_write", PLUGIN_HOOK_SINGLE, NULL,
static struct plugin_hook db_write_hook = {"db_write", PLUGIN_HOOK_CHAIN, NULL,
NULL, NULL};
AUTODATA(hooks, &db_write_hook);

/* A `db_write` for one particular plugin hook. */
struct db_write_hook_req {
struct plugin *plugin;
struct plugin_hook_request *ph_req;
size_t *num_hooks;
};

static void db_hook_response(const char *buffer, const jsmntok_t *toks,
const jsmntok_t *idtok,
struct plugin_hook_request *ph_req)
struct db_write_hook_req *dwh_req)
{
const jsmntok_t *resulttok;

resulttok = json_get_member(buffer, toks, "result");
if (!resulttok)
fatal("Plugin returned an invalid response to the db_write "
"hook: %s", buffer);
fatal("Plugin '%s' returned an invalid response to the "
"db_write hook: %s", dwh_req->plugin->cmd, buffer);

/* We expect result: { 'result' : 'continue' }.
* Anything else we abort.
*/
resulttok = json_get_member(buffer, resulttok, "result");
if (resulttok) {
if (!json_tok_streq(buffer, resulttok, "continue"))
fatal("Plugin returned failed db_write: %s.", buffer);
fatal("Plugin '%s' returned failed db_write: %s.",
dwh_req->plugin->cmd,
buffer);
} else
fatal("Plugin returned an invalid result to the db_write "
"hook: %s", buffer);
fatal("Plugin '%s' returned an invalid result to the db_write "
"hook: %s",
dwh_req->plugin->cmd,
buffer);

assert((*dwh_req->num_hooks) != 0);
--(*dwh_req->num_hooks);
/* If there are other runners, do not exit yet. */
if ((*dwh_req->num_hooks) != 0)
return;

/* We're done, exit exclusive loop. */
io_break(ph_req);
io_break(dwh_req->ph_req);
}

void plugin_hook_db_sync(struct db *db)
Expand All @@ -332,35 +349,47 @@ void plugin_hook_db_sync(struct db *db)
void *ret;
struct plugin **plugins;
size_t i;
size_t num_hooks;

const char **changes = db_changes(db);
if (tal_count(hook->hooks) == 0)
num_hooks = tal_count(hook->hooks);
if (num_hooks == 0)
return;

plugins = notleak(tal_arr(NULL, struct plugin *,
tal_count(hook->hooks)));
for (i = 0; i < tal_count(hook->hooks); ++i)
num_hooks));
for (i = 0; i < num_hooks; ++i)
plugins[i] = hook->hooks[i]->plugin;

ph_req = notleak(tal(hook->hooks, struct plugin_hook_request));
/* FIXME: do IO logging for this! */
req = jsonrpc_request_start(NULL, hook->name, NULL, NULL,
db_hook_response,
ph_req);

ph_req->hook = hook;
ph_req->db = db;
ph_req->plugin = hook->hooks[0]->plugin;

json_add_num(req->stream, "data_version", db_data_version_get(db));

json_array_start(req->stream, "writes");
for (size_t i = 0; i < tal_count(changes); i++)
json_add_string(req->stream, NULL, changes[i]);
json_array_end(req->stream);
jsonrpc_request_end(req);

plugin_request_send(ph_req->plugin, req);
ph_req->cb_arg = &num_hooks;

for (i = 0; i < num_hooks; ++i) {
/* Create an object for this plugin. */
struct db_write_hook_req *dwh_req;
dwh_req = tal(ph_req, struct db_write_hook_req);
dwh_req->plugin = plugins[i];
dwh_req->ph_req = ph_req;
dwh_req->num_hooks = &num_hooks;

/* FIXME: do IO logging for this! */
req = jsonrpc_request_start(NULL, hook->name, NULL, NULL,
db_hook_response,
dwh_req);

json_add_num(req->stream, "data_version",
db_data_version_get(db));

json_array_start(req->stream, "writes");
for (size_t i = 0; i < tal_count(changes); i++)
json_add_string(req->stream, NULL, changes[i]);
json_array_end(req->stream);
jsonrpc_request_end(req);

plugin_request_send(plugins[i], req);
}

/* We can be called on way out of an io_loop, which is already breaking.
* That will make this immediately return; save the break value and call
Expand All @@ -371,7 +400,9 @@ void plugin_hook_db_sync(struct db *db)
assert(ret2 == ph_req);
io_break(ret);
}
assert(num_hooks == 0);
tal_free(plugins);
tal_free(ph_req);
}

static void add_deps(const char ***arr,
Expand Down
16 changes: 16 additions & 0 deletions tests/plugins/dbdummy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#! /usr/bin/env python3
'''This plugin is a do-nothing backup plugin which just checks that we
can handle multiple backup plugins.
'''

from pyln.client import Plugin

plugin = Plugin()


@plugin.hook('db_write')
def db_write(plugin, **kwargs):
return {'result': 'continue'}


plugin.run()
26 changes: 26 additions & 0 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,32 @@ def test_db_hook(node_factory, executor):
assert [x for x in db1.iterdump()] == [x for x in db2.iterdump()]


@unittest.skipIf(os.getenv('TEST_DB_PROVIDER', 'sqlite3') != 'sqlite3', "Only sqlite3 implements the db_write_hook currently")
def test_db_hook_multiple(node_factory, executor):
"""This tests the db hook for multiple-plugin case."""
dbfile = os.path.join(node_factory.directory, "dblog.sqlite3")
l1 = node_factory.get_node(options={'plugin': os.path.join(os.getcwd(), 'tests/plugins/dblog.py'),
'important-plugin': os.path.join(os.getcwd(), 'tests/plugins/dbdummy.py'),
'dblog-file': dbfile})

# It should see the db being created, and sometime later actually get
# initted.
# This precedes startup, so needle already past
assert l1.daemon.is_in_log(r'plugin-dblog.py: deferring \d+ commands')
l1.daemon.logsearch_start = 0
l1.daemon.wait_for_log('plugin-dblog.py: replaying pre-init data:')
l1.daemon.wait_for_log('plugin-dblog.py: CREATE TABLE version \\(version INTEGER\\)')
l1.daemon.wait_for_log("plugin-dblog.py: initialized.* 'startup': True")

l1.stop()

# Databases should be identical.
db1 = sqlite3.connect(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'lightningd.sqlite3'))
db2 = sqlite3.connect(dbfile)

assert [x for x in db1.iterdump()] == [x for x in db2.iterdump()]


def test_utf8_passthrough(node_factory, executor):
l1 = node_factory.get_node(options={'plugin': os.path.join(os.getcwd(), 'tests/plugins/utf8.py'),
'log-level': 'io'})
Expand Down

0 comments on commit 426df0d

Please sign in to comment.