From 222adba56a09c0f8fb3961b6b480d14d2795e081 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 2 Sep 2024 12:09:59 -0400 Subject: [PATCH 1/3] Rewrite leader__barrier and leader__exec Signed-off-by: Cole Miller --- src/gateway.c | 99 ++++--- src/leader.c | 508 ++++++++++++++++++++++------------- src/leader.h | 56 ++-- test/unit/test_gateway.c | 9 +- test/unit/test_replication.c | 47 ++-- 5 files changed, 461 insertions(+), 258 deletions(-) diff --git a/src/gateway.c b/src/gateway.c index 8cfe4fac1..19ae9e709 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -309,10 +309,8 @@ static int handle_open(struct gateway *g, struct handle *req) return 0; } -static void prepareBarrierCb(struct barrier *barrier, int status) +static void prepare_bottom_half(struct gateway *g, int status) { - tracef("prepare barrier cb status:%d", status); - struct gateway *g = barrier->data; struct handle *req = g->req; struct response_stmt response_v0 = { 0 }; struct response_stmt_with_offset response_v1 = { 0 }; @@ -381,6 +379,13 @@ static void prepareBarrierCb(struct barrier *barrier, int status) } } +static void prepare_barrier_cb(struct barrier *barrier, int status) +{ + tracef("prepare barrier cb status:%d", status); + struct gateway *g = barrier->data; + prepare_bottom_half(g, status); +} + static int handle_prepare(struct gateway *g, struct handle *req) { tracef("handle prepare"); @@ -413,8 +418,10 @@ static int handle_prepare(struct gateway *g, struct handle *req) req->stmt_id = stmt->id; req->sql = request.sql; g->req = req; - rc = leader__barrier(g->leader, &g->barrier, prepareBarrierCb); - if (rc != 0) { + rc = leader_barrier_v2(g->leader, &g->barrier, prepare_barrier_cb); + if (rc == LEADER_NOT_ASYNC) { + prepare_bottom_half(g, 0); + } else if (rc != 0) { tracef("handle prepare barrier failed %d", rc); stmt__registry_del(&g->stmts, stmt); g->req = NULL; @@ -478,7 +485,6 @@ static int handle_exec(struct gateway *g, struct handle *req) struct stmt *stmt; struct request_exec request = { 0 }; int tuple_format; - uint64_t req_id; int rv; switch (req->schema) { @@ -513,10 +519,11 @@ static int handle_exec(struct gateway *g, struct handle *req) } req->stmt_id = stmt->id; g->req = req; - req_id = idNext(&g->random_state); - rv = leader__exec(g->leader, &g->exec, stmt->stmt, req_id, - leader_exec_cb); - if (rv != 0) { + rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, leader_exec_cb); + if (rv == LEADER_NOT_ASYNC) { + /* XXX */ + leader_exec_cb(&g->exec, g->exec.status); + } else if (rv != 0) { tracef("handle exec leader exec failed %d", rv); g->req = NULL; return rv; @@ -600,10 +607,8 @@ static void query_batch(struct gateway *g) #endif } -static void query_barrier_cb(struct barrier *barrier, int status) +static void query_bottom_half(struct gateway *g, int status) { - tracef("query barrier cb status:%d", status); - struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -620,6 +625,13 @@ static void query_barrier_cb(struct barrier *barrier, int status) query_batch(g); } +static void query_barrier_cb(struct barrier *barrier, int status) +{ + tracef("query barrier cb status:%d", status); + struct gateway *g = barrier->data; + query_bottom_half(g, status); +} + static void leaderModifyingQueryCb(struct exec *exec, int status) { struct gateway *g = exec->data; @@ -646,7 +658,6 @@ static int handle_query(struct gateway *g, struct handle *req) struct request_query request = { 0 }; int tuple_format; bool is_readonly; - uint64_t req_id; int rv; switch (req->schema) { @@ -684,11 +695,18 @@ static int handle_query(struct gateway *g, struct handle *req) is_readonly = (bool)sqlite3_stmt_readonly(stmt->stmt); if (is_readonly) { - rv = leader__barrier(g->leader, &g->barrier, query_barrier_cb); + rv = leader_barrier_v2(g->leader, &g->barrier, query_barrier_cb); + if (rv == LEADER_NOT_ASYNC) { + query_bottom_half(g, 0); + rv = 0; + } } else { - req_id = idNext(&g->random_state); - rv = leader__exec(g->leader, &g->exec, stmt->stmt, req_id, - leaderModifyingQueryCb); + rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, + leaderModifyingQueryCb); + if (rv == LEADER_NOT_ASYNC) { + /* XXX */ + leaderModifyingQueryCb(&g->exec, g->exec.status); + } } if (rv != 0) { g->req = NULL; @@ -813,10 +831,8 @@ static void handle_exec_sql_next(struct gateway *g, g->req = NULL; } -static void execSqlBarrierCb(struct barrier *barrier, int status) +static void exec_sql_bottom_half(struct gateway *g, int status) { - tracef("exec sql barrier cb status:%d", status); - struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -829,6 +845,13 @@ static void execSqlBarrierCb(struct barrier *barrier, int status) handle_exec_sql_next(g, req, false); } +static void exec_sql_barrier_cb(struct barrier *barrier, int status) +{ + tracef("exec sql barrier cb status:%d", status); + struct gateway *g = barrier->data; + exec_sql_bottom_half(g, status); +} + static int handle_exec_sql(struct gateway *g, struct handle *req) { tracef("handle exec sql schema:%" PRIu8, req->schema); @@ -856,8 +879,10 @@ static int handle_exec_sql(struct gateway *g, struct handle *req) req->sql = request.sql; req->exec_count = 0; g->req = req; - rc = leader__barrier(g->leader, &g->barrier, execSqlBarrierCb); - if (rc != 0) { + rc = leader_barrier_v2(g->leader, &g->barrier, exec_sql_barrier_cb); + if (rc == LEADER_NOT_ASYNC) { + exec_sql_bottom_half(g, 0); + } else if (rc != 0) { tracef("handle exec sql barrier failed %d", rc); g->req = NULL; return rc; @@ -884,10 +909,8 @@ static void leaderModifyingQuerySqlCb(struct exec *exec, int status) } } -static void querySqlBarrierCb(struct barrier *barrier, int status) +static void query_sql_bottom_half(struct gateway *g, int status) { - tracef("query sql barrier cb status:%d", status); - struct gateway *g = barrier->data; struct handle *req = g->req; assert(req != NULL); g->req = NULL; @@ -898,7 +921,6 @@ static void querySqlBarrierCb(struct barrier *barrier, int status) sqlite3_stmt *tail_stmt; int tuple_format; bool is_readonly; - uint64_t req_id; int rv; if (status != 0) { @@ -953,10 +975,12 @@ static void querySqlBarrierCb(struct barrier *barrier, int status) if (is_readonly) { query_batch(g); } else { - req_id = idNext(&g->random_state); - rv = leader__exec(g->leader, &g->exec, stmt, req_id, - leaderModifyingQuerySqlCb); - if (rv != 0) { + rv = leader_exec_v2(g->leader, &g->exec, stmt, + leaderModifyingQuerySqlCb); + if (rv == LEADER_NOT_ASYNC) { + /* XXX */ + leaderModifyingQuerySqlCb(&g->exec, g->exec.status); + } else if (rv != 0) { sqlite3_finalize(stmt); g->req = NULL; failure(req, rv, "leader exec"); @@ -964,6 +988,13 @@ static void querySqlBarrierCb(struct barrier *barrier, int status) } } +static void query_sql_barrier_cb(struct barrier *barrier, int status) +{ + tracef("query sql barrier cb status:%d", status); + struct gateway *g = barrier->data; + query_sql_bottom_half(g, status); +} + static int handle_query_sql(struct gateway *g, struct handle *req) { tracef("handle query sql schema:%" PRIu8, req->schema); @@ -988,8 +1019,10 @@ static int handle_query_sql(struct gateway *g, struct handle *req) FAIL_IF_CHECKPOINTING; req->sql = request.sql; g->req = req; - rv = leader__barrier(g->leader, &g->barrier, querySqlBarrierCb); - if (rv != 0) { + rv = leader_barrier_v2(g->leader, &g->barrier, query_sql_barrier_cb); + if (rv == LEADER_NOT_ASYNC) { + query_sql_bottom_half(g, 0); + } else if (rv != 0) { tracef("handle query sql barrier failed %d", rv); g->req = NULL; return rv; diff --git a/src/leader.c b/src/leader.c index 61ef91639..6e1d5ded6 100644 --- a/src/leader.c +++ b/src/leader.c @@ -16,17 +16,6 @@ #include "utils.h" #include "vfs.h" -/* Called when a leader exec request terminates and the associated callback can - * be invoked. */ -static void leaderExecDone(struct exec *req) -{ - tracef("leader exec done id:%" PRIu64, req->id); - req->leader->exec = NULL; - if (req->cb != NULL) { - req->cb(req, req->status); - } -} - /* Open a SQLite connection and set it to leader replication mode. */ static int openConnection(const char *filename, const char *vfs, @@ -146,6 +135,8 @@ int leader__init(struct leader *l, struct db *db, struct raft *raft) return 0; } +static void exec_done(struct exec *, int); + void leader__close(struct leader *l) { tracef("leader close"); @@ -154,7 +145,7 @@ void leader__close(struct leader *l) if (l->exec != NULL) { assert(l->inflight == NULL); l->exec->status = SQLITE_ERROR; - leaderExecDone(l->exec); + exec_done(l->exec, 0); } rc = sqlite3_close(l->conn); assert(rc == 0); @@ -241,66 +232,10 @@ static void leaderMaybeCheckpointLegacy(struct leader *l) raft_free(buf.base); } -static void leaderApplyFramesCb(struct raft_apply *req, - int status, - void *result) -{ - tracef("apply frames cb id:%" PRIu64, idExtract(req->req_id)); - struct apply *apply = req->data; - struct leader *l = apply->leader; - if (l == NULL) { - raft_free(apply); - return; - } - - (void)result; - - if (status != 0) { - tracef("apply frames cb failed status %d", status); - sqlite3_vfs *vfs = sqlite3_vfs_find(l->db->config->name); - switch (status) { - case RAFT_LEADERSHIPLOST: - l->exec->status = SQLITE_IOERR_LEADERSHIP_LOST; - break; - case RAFT_NOSPACE: - l->exec->status = SQLITE_IOERR_WRITE; - break; - case RAFT_SHUTDOWN: - /* If we got here it means we have manually - * fired the apply callback from - * gateway__close(). In this case we don't - * free() the apply object, since it will be - * freed when the callback is fired again by - * raft. - * - * TODO: we should instead make gatewa__close() - * itself asynchronous. */ - apply->leader = NULL; - l->exec->status = SQLITE_ABORT; - goto finish; - break; - default: - l->exec->status = SQLITE_IOERR; - break; - } - VfsAbort(vfs, l->db->path); - } - - raft_free(apply); - - if (status == 0) { - leaderMaybeCheckpointLegacy(l); - } - -finish: - l->inflight = NULL; - l->db->tx_id = 0; - leaderExecDone(l->exec); -} - static int leaderApplyFrames(struct exec *req, dqlite_vfs_frame *frames, - unsigned n) + unsigned n, + raft_apply_cb cb) { tracef("leader apply frames id:%" PRIu64, req->id); struct leader *l = req->leader; @@ -337,11 +272,11 @@ static int leaderApplyFrames(struct exec *req, idSet(apply->req.req_id, req->id); #ifdef USE_SYSTEM_RAFT - rv = raft_apply(l->raft, &apply->req, &buf, 1, leaderApplyFramesCb); + rv = raft_apply(l->raft, &apply->req, &buf, 1, cb); #else /* TODO actual WAL slice goes here */ struct raft_entry_local_data local_data = {}; - rv = raft_apply(l->raft, &apply->req, &buf, &local_data, 1, leaderApplyFramesCb); + rv = raft_apply(l->raft, &apply->req, &buf, &local_data, 1, cb); #endif if (rv != 0) { tracef("raft apply failed %d", rv); @@ -362,9 +297,204 @@ static int leaderApplyFrames(struct exec *req, return rv; } -static void leaderExecV2(struct exec *req, enum pool_half half) +enum { + BARRIER_START, + BARRIER_PASSED, + BARRIER_DONE, + BARRIER_FAIL, + BARRIER_NR, +}; + +static const struct sm_conf barrier_states[BARRIER_NR] = { + [BARRIER_START] = { + .name = "start", + .allowed = BITS(BARRIER_PASSED) + |BITS(BARRIER_DONE) + |BITS(BARRIER_FAIL), + .flags = SM_INITIAL, + }, + [BARRIER_PASSED] = { + .name = "passed", + .allowed = BITS(BARRIER_DONE) + |BITS(BARRIER_FAIL), + }, + [BARRIER_DONE] = { + .name = "done", + .flags = SM_FINAL, + }, + [BARRIER_FAIL] = { + .name = "fail", + .flags = SM_FINAL|SM_FAILURE, + }, +}; + +static bool barrier_invariant(const struct sm *sm, int prev) +{ + (void)sm; + (void)prev; + return true; +} + +static void barrier_done(struct barrier *barrier, int status) +{ + PRE(barrier != NULL); + int state = sm_state(&barrier->sm); + PRE(state == BARRIER_START || state == BARRIER_PASSED); + void (*cb)(struct barrier *, int) = barrier->cb; + PRE(cb != NULL); + + if (status != 0) { + sm_fail(&barrier->sm, BARRIER_FAIL, status); + } else { + sm_move(&barrier->sm, BARRIER_DONE); + } + sm_fini(&barrier->sm); + /* TODO(cole) uncommment this once the barrier-callback-runs-twice + * issue is fixed. */ + /* barrier->req.data = NULL; */ + barrier->leader = NULL; + barrier->cb = NULL; + + if (state == BARRIER_PASSED) { + cb(barrier, status); + } +} + +static void barrier_raft_cb(struct raft_barrier *, int); + +static int barrier_async(struct barrier *barrier, int status) +{ + int rv; + + if (sm_state(&barrier->sm) == BARRIER_START) { + PRE(status == 0); + rv = raft_barrier(barrier->leader->raft, &barrier->req, barrier_raft_cb); + if (rv != 0) { + barrier_done(barrier, rv); + } + return rv; + } + + PRE(sm_state(&barrier->sm) == BARRIER_PASSED); + status = status == 0 ? 0 : + status == RAFT_LEADERSHIPLOST ? SQLITE_IOERR_LEADERSHIP_LOST : + SQLITE_ERROR; + barrier_done(barrier, status); + return 0; +} + +static void barrier_raft_cb(struct raft_barrier *rb, int status) +{ + struct barrier *barrier = rb->data; + PRE(barrier != NULL); + /* TODO(cole) it seems that raft can invoke this callback more than + * once, investigate and fix that and then remove this workaround. */ + if (sm_state(&barrier->sm) > BARRIER_START) { + return; + } + sm_move(&barrier->sm, BARRIER_PASSED); + (void)barrier_async(rb->data, status); +} + +int leader_barrier_v2(struct leader *l, + struct barrier *barrier, + barrier_cb cb) +{ + int rv; + + if (!needsBarrier(l)) { + return LEADER_NOT_ASYNC; + } + + sm_init(&barrier->sm, barrier_invariant, NULL, barrier_states, "barrier", + BARRIER_START); + barrier->cb = cb; + barrier->leader = l; + barrier->req.data = barrier; + rv = barrier_async(barrier, 0); + POST(rv != LEADER_NOT_ASYNC); + return rv; +} + +enum { + EXEC_START, + EXEC_BARRIER, + EXEC_STEPPED, + EXEC_POLLED, + EXEC_APPLIED, + EXEC_DONE, + EXEC_FAILED, + EXEC_NR, +}; + +static const struct sm_conf exec_states[EXEC_NR] = { + [EXEC_START] = { + .name = "start", + .allowed = BITS(EXEC_BARRIER) + |BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + .flags = SM_INITIAL, + }, + [EXEC_BARRIER] = { + .name = "barrier", + .allowed = BITS(EXEC_STEPPED) + |BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + }, + [EXEC_STEPPED] = { + .name = "stepped", + .allowed = BITS(EXEC_POLLED) + |BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + }, + [EXEC_POLLED] = { + .name = "polled", + .allowed = BITS(EXEC_APPLIED) + |BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + }, + [EXEC_APPLIED] = { + .name = "applied", + .allowed = BITS(EXEC_FAILED) + |BITS(EXEC_DONE), + }, + [EXEC_DONE] = { + .name = "done", + .flags = SM_FINAL, + }, + [EXEC_FAILED] = { + .name = "failed", + .flags = SM_FAILURE|SM_FINAL, + }, +}; + +static bool exec_invariant(const struct sm *sm, int prev) +{ + (void)sm; + (void)prev; + return true; +} + +static void exec_done(struct exec *req, int asyncness) +{ + int status = req->status; + status = status ? status : SQLITE_ERROR; + if (status == SQLITE_DONE) { + sm_move(&req->sm, EXEC_DONE); + } else { + sm_fail(&req->sm, EXEC_FAILED, status); + } + sm_fini(&req->sm); + req->leader->exec = NULL; + if (req->cb != NULL && asyncness == 0) { + req->cb(req, status); + } +} + +static void exec_apply_cb(struct raft_apply *, int, void *); + +static int exec_apply(struct exec *req) { - tracef("leader exec v2 id:%" PRIu64, req->id); struct leader *l = req->leader; struct db *db = l->db; sqlite3_vfs *vfs = sqlite3_vfs_find(db->config->name); @@ -374,159 +504,175 @@ static void leaderExecV2(struct exec *req, enum pool_half half) unsigned i; int rv; - if (half == POOL_TOP_HALF) { - req->status = sqlite3_step(req->stmt); - return; - } /* else POOL_BOTTOM_HALF => */ + req->status = sqlite3_step(req->stmt); + sm_move(&req->sm, EXEC_STEPPED); rv = VfsPoll(vfs, db->path, &frames, &n); - if (rv != 0 || n == 0) { - tracef("vfs poll"); - goto finish; + if (rv != 0) { + return rv; + } + sm_move(&req->sm, EXEC_POLLED); + if (n == 0) { + return LEADER_NOT_ASYNC; } /* Check if the new frames would create an overfull database */ size = VfsDatabaseSize(vfs, db->path, n, db->config->page_size); if (size > VfsDatabaseSizeLimit(vfs)) { rv = SQLITE_FULL; - goto abort; + goto err; } - rv = leaderApplyFrames(req, frames, n); + rv = leaderApplyFrames(req, frames, n, exec_apply_cb); if (rv != 0) { - goto abort; - } - - for (i = 0; i < n; i++) { - sqlite3_free(frames[i].data); + goto err; } - sqlite3_free(frames); - return; -abort: +err: for (i = 0; i < n; i++) { sqlite3_free(frames[i].data); } sqlite3_free(frames); - VfsAbort(vfs, l->db->path); -finish: if (rv != 0) { - tracef("exec v2 failed %d", rv); - l->exec->status = rv; + VfsAbort(vfs, l->db->path); } - leaderExecDone(l->exec); + return rv; } -#ifdef DQLITE_NEXT +static int exec_async(struct exec *, int); -static void exec_top(pool_work_t *w) +static void exec_apply_cb(struct raft_apply *req, + int status, + void *result) { - struct exec *req = CONTAINER_OF(w, struct exec, work); - leaderExecV2(req, POOL_TOP_HALF); + (void)result; + struct apply *apply = req->data; + struct leader *l; + struct exec *exec; + + l = apply->leader; + if (l == NULL) { + raft_free(apply); + return; + } + + exec = l->exec; + PRE(exec != NULL); + sm_move(&exec->sm, EXEC_APPLIED); + if (status == RAFT_SHUTDOWN) { + apply->leader = NULL; + } else { + raft_free(apply); + } + exec_async(exec, status); } -static void exec_bottom(pool_work_t *w) +static int exec_status(int r) { - struct exec *req = CONTAINER_OF(w, struct exec, work); - leaderExecV2(req, POOL_BOTTOM_HALF); + PRE(r != 0); + return r == RAFT_LEADERSHIPLOST ? SQLITE_IOERR_LEADERSHIP_LOST : + r == RAFT_NOSPACE ? SQLITE_IOERR_WRITE : + r == RAFT_SHUTDOWN ? SQLITE_ABORT : + SQLITE_IOERR; } -#endif - -static void execBarrierCb(struct barrier *barrier, int status) +static void exec_barrier_cb(struct barrier *barrier, int status) { - tracef("exec barrier cb status %d", status); struct exec *req = barrier->data; - struct leader *l = req->leader; + PRE(req != NULL); + sm_move(&req->sm, EXEC_BARRIER); + exec_async(req, status); +} - if (status != 0) { - l->exec->status = status; - leaderExecDone(l->exec); - return; +/** + * Exec request pseudo-coroutine, encapsulating the whole lifecycle. + */ +int exec_async(struct exec *req, int status) +{ + struct leader *l; + sqlite3_vfs *vfs; + int barrier_rv = 0; + int apply_rv = 0; + int ret = 0; + + switch (sm_state(&req->sm)) { + case EXEC_START: + PRE(status == 0); + l = req->leader; + PRE(l != NULL); + barrier_rv = leader_barrier_v2(l, &req->barrier, exec_barrier_cb); + ret = barrier_rv; + if (barrier_rv == 0) { + break; + } else if (barrier_rv != LEADER_NOT_ASYNC) { + l->exec = NULL; + break; + } /* else barrier_rv == LEADER_NOT_ASYNC => */ + sm_move(&req->sm, EXEC_BARRIER); + POST(status == 0); + /* fallthrough */ + case EXEC_BARRIER: + if (status != 0) { + req->status = status; + exec_done(req, ret); + break; + } + apply_rv = exec_apply(req); + if (apply_rv == 0) { + ret = 0; + break; + } else if (apply_rv != LEADER_NOT_ASYNC) { + req->status = apply_rv; + exec_done(req, ret); + ret = 0; + break; + } /* else apply_rv == LEADER_NOT_ASYNC => */ + ret &= LEADER_NOT_ASYNC; + sm_move(&req->sm, EXEC_APPLIED); + POST(status == 0); + /* fallthrough */ + case EXEC_APPLIED: + l = req->leader; + PRE(l != NULL); + vfs = sqlite3_vfs_find(l->db->config->name); + PRE(vfs != NULL); + if (apply_rv == 0) { + if (status == 0) { + leaderMaybeCheckpointLegacy(l); + } else { + req->status = exec_status(status); + VfsAbort(vfs, l->db->path); + } + l->inflight = NULL; + l->db->tx_id = 0; + } + exec_done(req, ret); + break; + default: + POST(false && "impossible!"); } -#ifdef DQLITE_NEXT - struct dqlite_node *node = l->raft->data; - pool_t *pool = !!(pool_ut_fallback()->flags & POOL_FOR_UT) - ? pool_ut_fallback() : &node->pool; - pool_queue_work(pool, &req->work, l->db->cookie, - WT_UNORD, exec_top, exec_bottom); -#else - leaderExecV2(req, POOL_TOP_HALF); - leaderExecV2(req, POOL_BOTTOM_HALF); -#endif + return ret; } -int leader__exec(struct leader *l, - struct exec *req, - sqlite3_stmt *stmt, - uint64_t id, - exec_cb cb) +int leader_exec_v2(struct leader *l, + struct exec *req, + sqlite3_stmt *stmt, + exec_cb cb) { - tracef("leader exec id:%" PRIu64, id); - int rv; if (l->exec != NULL) { - tracef("busy"); return SQLITE_BUSY; } l->exec = req; + sm_init(&req->sm, exec_invariant, NULL, exec_states, "exec", + EXEC_START); req->leader = l; req->stmt = stmt; - req->id = id; req->cb = cb; req->barrier.data = req; req->barrier.cb = NULL; req->work = (pool_work_t){}; - rv = leader__barrier(l, &req->barrier, execBarrierCb); - if (rv != 0) { - l->exec = NULL; - return rv; - } - return 0; -} - -static void raftBarrierCb(struct raft_barrier *req, int status) -{ - tracef("raft barrier cb status %d", status); - struct barrier *barrier = req->data; - int rv = 0; - if (status != 0) { - if (status == RAFT_LEADERSHIPLOST) { - rv = SQLITE_IOERR_LEADERSHIP_LOST; - } else { - rv = SQLITE_ERROR; - } - } - barrier_cb cb = barrier->cb; - if (cb == NULL) { - tracef("barrier cb already fired"); - return; - } - barrier->cb = NULL; - cb(barrier, rv); -} - -int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb) -{ - tracef("leader barrier"); - int rv; - if (!needsBarrier(l)) { - tracef("not needed"); - cb(barrier, 0); - return 0; - } - barrier->cb = cb; - barrier->leader = l; - barrier->req.data = barrier; - rv = raft_barrier(l->raft, &barrier->req, raftBarrierCb); - if (rv != 0) { - tracef("raft barrier failed %d", rv); - barrier->req.data = NULL; - barrier->leader = NULL; - barrier->cb = NULL; - return rv; - } - return 0; + return exec_async(req, 0); } diff --git a/src/leader.h b/src/leader.h index 9d022d3e9..2a078755d 100644 --- a/src/leader.h +++ b/src/leader.h @@ -8,14 +8,17 @@ #include #include -#include "./lib/queue.h" #include "db.h" +#include "lib/queue.h" +#include "lib/sm.h" #include "lib/threadpool.h" #include "raft.h" #define SQLITE_IOERR_NOT_LEADER (SQLITE_IOERR | (40 << 8)) #define SQLITE_IOERR_LEADERSHIP_LOST (SQLITE_IOERR | (41 << 8)) +#define LEADER_NOT_ASYNC INT_MAX + struct exec; struct barrier; struct leader; @@ -47,6 +50,7 @@ struct leader { struct barrier { void *data; + struct sm sm; struct leader *leader; struct raft_barrier req; barrier_cb cb; @@ -57,6 +61,7 @@ struct barrier { */ struct exec { void *data; + struct sm sm; struct leader *leader; struct barrier barrier; sqlite3_stmt *stmt; @@ -79,32 +84,39 @@ int leader__init(struct leader *l, struct db *db, struct raft *raft); void leader__close(struct leader *l); /** - * Submit a request to step a SQLite statement. + * Submit a raft barrier request if there is no transaction in progress on the + * underlying database and the FSM is behind the last log index. + * + * The callback will only be invoked asynchronously: if no barrier is needed, + * this function will return without invoking the callback. * - * The request will be dispatched to the leader loop coroutine, which will be - * resumed and will invoke sqlite_step(). If the statement triggers the - * replication hooks and one or more new Raft log entries need to be appended, - * then the loop coroutine will be paused and control will be transferred back - * to the main coroutine. In this state the leader loop coroutine call stack - * will be "blocked" on the xFrames() replication hook call triggered by the top - * sqlite_step() call. The leader loop coroutine will be resumed once the Raft - * append request completes (either successfully or not) and at that point the - * stack will rewind back to the @sqlite_step() call, returning to the leader - * loop which will then have completed the request and transfer control back to - * the main coroutine, pausing until the next request. + * Returns 0 if the callback was scheduled successfully or LEADER_NOT_ASYNC + * if no barrier is needed. Any other value indicates an error. */ -int leader__exec(struct leader *l, - struct exec *req, - sqlite3_stmt *stmt, - uint64_t id, - exec_cb cb); +int leader_barrier_v2(struct leader *l, + struct barrier *barrier, + barrier_cb cb); /** - * Submit a raft barrier request if there is no transaction in progress in the - * underlying database and the FSM is behind the last log index. + * Submit a request to step a SQLite statement. + * + * This is an asynchronous operation in general. It can yield to the event + * loop at two points: + * + * - When running the preliminary barrier (see leader_barrier_v2). This + * is skipped if no barrier is necessary. + * - When replicating the transaction in raft. This is skipped if the + * statement doesn't generate any changed pages. * - * Otherwise, just invoke the given @cb immediately. + * If both of these yields are skipped, this function returns LEADER_NOT_ASYNC + * and does not invoke the callback. In this case the caller must examine + * `req->status` to determine whether the exec was successful. Otherwise, + * this function returns 0 if it successfully scheduled the callback and + * yielded, or any other value to indicate an error. */ -int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb); +int leader_exec_v2(struct leader *l, + struct exec *req, + sqlite3_stmt *stmt, + exec_cb cb); #endif /* LEADER_H_*/ diff --git a/test/unit/test_gateway.c b/test/unit/test_gateway.c index d0e1fb7c6..8539c3ee2 100644 --- a/test/unit/test_gateway.c +++ b/test/unit/test_gateway.c @@ -567,7 +567,7 @@ TEST_CASE(prepare, barrier_error, NULL) f->request.db_id = 0; f->request.sql = "SELECT n FROM test"; ENCODE(&f->request, prepare); - /* We rely on leader__barrier (called by handle_prepare) attempting + /* We rely on leader_barrier_v2 (called by handle_prepare) attempting * an allocation using raft_malloc. */ test_raft_heap_fault_config(0, 1); test_raft_heap_fault_enable(); @@ -918,8 +918,11 @@ TEST_CASE(exec, close_while_in_flight, NULL) EXEC("INSERT INTO test(n) VALUES(1)"); } + /* Trigger a second page cache flush to the WAL, and abort before it's * done. */ + /* FIXME(cole) it seems that this may no longer be successfully triggering + * the page cache flush */ EXEC_SQL_SUBMIT("INSERT INTO test(n) VALUES(1)"); return MUNIT_OK; } @@ -1869,7 +1872,7 @@ TEST_CASE(exec_sql, barrier_error, NULL) f->request.db_id = 0; f->request.sql = "INSERT INTO test VALUES(123)"; ENCODE(&f->request, exec_sql); - /* We rely on leader__barrier (called by handle_exec_sql) attempting + /* We rely on leader_barrier_v2 (called by handle_exec_sql) attempting * an allocation using raft_malloc. */ test_raft_heap_fault_config(0, 1); test_raft_heap_fault_enable(); @@ -2243,7 +2246,7 @@ TEST_CASE(query_sql, barrier_error, NULL) f->request.db_id = 0; f->request.sql = "SELECT n FROM test"; ENCODE(&f->request, query_sql); - /* We rely on leader__barrier (called by handle_query_sql) attempting + /* We rely on leader_barrier_v2 (called by handle_query_sql) attempting * an allocation using raft_malloc. */ test_raft_heap_fault_config(0, 1); test_raft_heap_fault_enable(); diff --git a/test/unit/test_replication.c b/test/unit/test_replication.c index 00fdd9ba7..dfd0fce68 100644 --- a/test/unit/test_replication.c +++ b/test/unit/test_replication.c @@ -88,12 +88,16 @@ TEST_MODULE(replication_v1); } /* Submit an exec request using the I'th leader. */ -#define EXEC(I) \ - { \ - int rc2; \ - rc2 = leader__exec(LEADER(I), &f->req, f->stmt, 0, \ - fixture_exec_cb); \ - munit_assert_int(rc2, ==, 0); \ +#define EXEC(I) \ + { \ + int rc2; \ + rc2 = leader_exec_v2(LEADER(I), &f->req, f->stmt, \ + fixture_exec_cb); \ + if (rc2 == LEADER_NOT_ASYNC) { \ + fixture_exec_cb(&f->req, f->req.status); \ + } else { \ + munit_assert_int(rc2, ==, 0); \ + } \ } /* Convenience to prepare, execute and finalize a statement. */ @@ -167,7 +171,7 @@ TEST_CASE(init, conn, NULL) /****************************************************************************** * - * leader__exec + * leader_exec_v2 * ******************************************************************************/ @@ -349,33 +353,41 @@ static void execCb(struct exec *req, int status) f->status = status; } +static void fixture_exec(struct fixture *f, unsigned i) +{ + int rv; + + rv = leader_exec_v2(LEADER(i), &f->req, f->stmt, execCb); + if (rv == LEADER_NOT_ASYNC) { + execCb(&f->req, f->req.status); + return; + } + munit_assert_int(rv, ==, 0); +} + TEST(replication, exec, setUp, tearDown, 0, NULL) { struct fixture *f = data; - int rv; CLUSTER_ELECT(0); PREPARE(0, "BEGIN"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); + fixture_exec(f, 0); CLUSTER_APPLIED(3); - munit_assert_int(rv, ==, 0); munit_assert_true(f->invoked); munit_assert_int(f->status, ==, SQLITE_DONE); f->invoked = false; FINALIZE; PREPARE(0, "CREATE TABLE test (a INT)"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); - munit_assert_int(rv, ==, 0); + fixture_exec(f, 0); munit_assert_true(f->invoked); munit_assert_int(f->status, ==, SQLITE_DONE); f->invoked = false; FINALIZE; PREPARE(0, "COMMIT"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); - munit_assert_int(rv, ==, 0); + fixture_exec(f, 0); munit_assert_false(f->invoked); FINALIZE; @@ -400,21 +412,18 @@ TEST(replication, checkpoint, setUp, tearDown, 0, NULL) { struct fixture *f = data; struct config *config = CLUSTER_CONFIG(0); - int rv; config->checkpoint_threshold = 3; CLUSTER_ELECT(0); PREPARE(0, "CREATE TABLE test (n INT)"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); - munit_assert_int(rv, ==, 0); + fixture_exec(f, 0); CLUSTER_APPLIED(4); FINALIZE; PREPARE(0, "INSERT INTO test(n) VALUES(1)"); - rv = leader__exec(LEADER(0), &f->req, f->stmt, 0, execCb); - munit_assert_int(rv, ==, 0); + fixture_exec(f, 0); CLUSTER_APPLIED(6); FINALIZE; From f8a5aee97dc9480e8b5a321677efafd14a59f8a5 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 2 Sep 2024 12:09:41 -0400 Subject: [PATCH 2/3] Rewrite handle_exec_sql_next Signed-off-by: Cole Miller --- src/gateway.c | 96 +++++++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/src/gateway.c b/src/gateway.c index 19ae9e709..2eb9b7948 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -761,66 +761,66 @@ static void handle_exec_sql_next(struct gateway *g, bool done) { tracef("handle exec sql next"); + struct leader *l = g->leader; + PRE(l != NULL); struct cursor *cursor = &req->cursor; - struct response_result response = { 0 }; - sqlite3_stmt *stmt = NULL; + int schema = req->schema; + struct response_result response = {}; + sqlite3_stmt *stmt; + const char *sql; const char *tail; int tuple_format; - uint64_t req_id; int rv; - if (req->sql == NULL || strcmp(req->sql, "") == 0) { - goto success; - } - - /* stmt will be set to NULL by sqlite when an error occurs. */ - assert(g->leader != NULL); - rv = sqlite3_prepare_v2(g->leader->conn, req->sql, -1, &stmt, &tail); - if (rv != SQLITE_OK) { - tracef("exec sql prepare failed %d", rv); - failure(req, rv, sqlite3_errmsg(g->leader->conn)); - goto done; - } - - if (stmt == NULL) { - goto success; - } - - if (!done) { - switch (req->schema) { - case DQLITE_REQUEST_PARAMS_SCHEMA_V0: - tuple_format = TUPLE__PARAMS; - break; - case DQLITE_REQUEST_PARAMS_SCHEMA_V1: - tuple_format = TUPLE__PARAMS32; - break; - default: - /* Should have been caught by handle_exec_sql */ - assert(0); + tuple_format = schema == DQLITE_REQUEST_PARAMS_SCHEMA_V0 ? + TUPLE__PARAMS : + schema == DQLITE_REQUEST_PARAMS_SCHEMA_V1 ? + TUPLE__PARAMS32 : + (POST(false && "impossible"), 0); + + for (;;) { + stmt = NULL; + sql = req->sql; + if (sql == NULL || strcmp(sql, "") == 0) { + goto success; } - rv = bind__params(stmt, cursor, tuple_format); + /* stmt will be set to NULL by sqlite when an error occurs. */ + rv = sqlite3_prepare_v2(l->conn, sql, -1, &stmt, &tail); if (rv != SQLITE_OK) { - failure(req, rv, "bind parameters"); - goto done_after_prepare; + tracef("exec sql prepare failed %d", rv); + failure(req, rv, sqlite3_errmsg(l->conn)); + goto done; + } + if (stmt == NULL) { + goto success; + } + if (!done) { + rv = bind__params(stmt, cursor, tuple_format); + if (rv != SQLITE_OK) { + failure(req, rv, "bind parameters"); + goto done_after_prepare; + } } - } - - req->sql = tail; - g->req = req; - req_id = idNext(&g->random_state); - /* At this point, leader__exec takes ownership of stmt */ - rv = - leader__exec(g->leader, &g->exec, stmt, req_id, handle_exec_sql_cb); - if (rv != SQLITE_OK) { - failure(req, rv, sqlite3_errmsg(g->leader->conn)); - goto done_after_prepare; + req->sql = tail; + g->req = req; + rv = leader_exec_v2(g->leader, &g->exec, stmt, handle_exec_sql_cb); + if (rv == 0) { + return; + } else if (rv != LEADER_NOT_ASYNC) { + failure(req, rv, sqlite3_errmsg(l->conn)); + goto done_after_prepare; + } else if (g->exec.status != SQLITE_DONE) { + /* XXX */ + failure(req, g->exec.status, sqlite3_errmsg(l->conn)); + goto done_after_prepare; + } + done = true; + sqlite3_finalize(stmt); + req->exec_count++; } - return; - success: - tracef("handle exec sql next success"); if (req->exec_count > 0) { fill_result(g, &response); } From 5f139a222313e1c9247c1b06859b82fc010b5d61 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 2 Sep 2024 12:13:01 -0400 Subject: [PATCH 3/3] Add a regression test Signed-off-by: Cole Miller --- test/integration/test_client.c | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/test/integration/test_client.c b/test/integration/test_client.c index 98fccd428..c2a1cfec2 100644 --- a/test/integration/test_client.c +++ b/test/integration/test_client.c @@ -145,3 +145,29 @@ TEST(client, querySql, setUp, tearDown, 0, client_params) return MUNIT_OK; } + +/* Stress test of an EXEC_SQL with many ';'-separated statements. */ +TEST(client, semicolons, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + (void)params; + + static const char trivial_stmt[] = "CREATE TABLE IF NOT EXISTS foo (n INT);"; + + size_t n = 1000; + size_t unit = sizeof(trivial_stmt) - 1; + char *sql = munit_malloc(n * unit); + char *p = sql; + for (size_t i = 0; i < n; i++) { + memcpy(p, trivial_stmt, unit); + p += unit; + } + sql[n * unit - 1] = '\0'; + + uint64_t last_insert_id; + uint64_t rows_affected; + EXEC_SQL(sql, &last_insert_id, &rows_affected); + + free(sql); + return MUNIT_OK; +}