Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor leader.c to fix stack growth in handle_exec_sql #697

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 114 additions & 81 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,8 @@
return 0;
}

static void prepareBarrierCb(struct barrier *barrier, int status)
static void prepare_bottom_half(struct gateway *g, int status)
{
tracef("prepare barrier cb status:%d", status);
struct gateway *g = barrier->data;
struct handle *req = g->req;
struct response_stmt response_v0 = { 0 };
struct response_stmt_with_offset response_v1 = { 0 };
Expand Down Expand Up @@ -381,6 +379,13 @@
}
}

static void prepare_barrier_cb(struct barrier *barrier, int status)
{
tracef("prepare barrier cb status:%d", status);
struct gateway *g = barrier->data;
prepare_bottom_half(g, status);
}

static int handle_prepare(struct gateway *g, struct handle *req)
{
tracef("handle prepare");
Expand Down Expand Up @@ -413,8 +418,10 @@
req->stmt_id = stmt->id;
req->sql = request.sql;
g->req = req;
rc = leader__barrier(g->leader, &g->barrier, prepareBarrierCb);
if (rc != 0) {
rc = leader_barrier_v2(g->leader, &g->barrier, prepare_barrier_cb);
if (rc == LEADER_NOT_ASYNC) {
prepare_bottom_half(g, 0);
} else if (rc != 0) {
tracef("handle prepare barrier failed %d", rc);
stmt__registry_del(&g->stmts, stmt);
g->req = NULL;
Expand Down Expand Up @@ -478,7 +485,6 @@
struct stmt *stmt;
struct request_exec request = { 0 };
int tuple_format;
uint64_t req_id;
int rv;

switch (req->schema) {
Expand Down Expand Up @@ -513,10 +519,11 @@
}
req->stmt_id = stmt->id;
g->req = req;
req_id = idNext(&g->random_state);
rv = leader__exec(g->leader, &g->exec, stmt->stmt, req_id,
leader_exec_cb);
if (rv != 0) {
rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt, leader_exec_cb);
if (rv == LEADER_NOT_ASYNC) {
/* XXX */
leader_exec_cb(&g->exec, g->exec.status);
} else if (rv != 0) {
tracef("handle exec leader exec failed %d", rv);
g->req = NULL;
return rv;
Expand Down Expand Up @@ -600,10 +607,8 @@
#endif
}

static void query_barrier_cb(struct barrier *barrier, int status)
static void query_bottom_half(struct gateway *g, int status)
{
tracef("query barrier cb status:%d", status);
struct gateway *g = barrier->data;
struct handle *req = g->req;
assert(req != NULL);
g->req = NULL;
Expand All @@ -620,6 +625,13 @@
query_batch(g);
}

static void query_barrier_cb(struct barrier *barrier, int status)

Check warning on line 628 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L628

Added line #L628 was not covered by tests
{
tracef("query barrier cb status:%d", status);
struct gateway *g = barrier->data;
query_bottom_half(g, status);

Check warning on line 632 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L631-L632

Added lines #L631 - L632 were not covered by tests
}

static void leaderModifyingQueryCb(struct exec *exec, int status)
{
struct gateway *g = exec->data;
Expand All @@ -646,7 +658,6 @@
struct request_query request = { 0 };
int tuple_format;
bool is_readonly;
uint64_t req_id;
int rv;

switch (req->schema) {
Expand Down Expand Up @@ -684,11 +695,18 @@

is_readonly = (bool)sqlite3_stmt_readonly(stmt->stmt);
if (is_readonly) {
rv = leader__barrier(g->leader, &g->barrier, query_barrier_cb);
rv = leader_barrier_v2(g->leader, &g->barrier, query_barrier_cb);
if (rv == LEADER_NOT_ASYNC) {
query_bottom_half(g, 0);
rv = 0;
}
} else {
req_id = idNext(&g->random_state);
rv = leader__exec(g->leader, &g->exec, stmt->stmt, req_id,
leaderModifyingQueryCb);
rv = leader_exec_v2(g->leader, &g->exec, stmt->stmt,
leaderModifyingQueryCb);
if (rv == LEADER_NOT_ASYNC) {
/* XXX */
leaderModifyingQueryCb(&g->exec, g->exec.status);

Check warning on line 708 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L708

Added line #L708 was not covered by tests
}
}
if (rv != 0) {
g->req = NULL;
Expand Down Expand Up @@ -743,66 +761,66 @@
bool done)
{
tracef("handle exec sql next");
struct leader *l = g->leader;
PRE(l != NULL);
struct cursor *cursor = &req->cursor;
struct response_result response = { 0 };
sqlite3_stmt *stmt = NULL;
int schema = req->schema;
struct response_result response = {};
sqlite3_stmt *stmt;
const char *sql;
const char *tail;
int tuple_format;
uint64_t req_id;
int rv;

if (req->sql == NULL || strcmp(req->sql, "") == 0) {
goto success;
}

/* stmt will be set to NULL by sqlite when an error occurs. */
assert(g->leader != NULL);
rv = sqlite3_prepare_v2(g->leader->conn, req->sql, -1, &stmt, &tail);
if (rv != SQLITE_OK) {
tracef("exec sql prepare failed %d", rv);
failure(req, rv, sqlite3_errmsg(g->leader->conn));
goto done;
}

if (stmt == NULL) {
goto success;
}

if (!done) {
switch (req->schema) {
case DQLITE_REQUEST_PARAMS_SCHEMA_V0:
tuple_format = TUPLE__PARAMS;
break;
case DQLITE_REQUEST_PARAMS_SCHEMA_V1:
tuple_format = TUPLE__PARAMS32;
break;
default:
/* Should have been caught by handle_exec_sql */
assert(0);
tuple_format = schema == DQLITE_REQUEST_PARAMS_SCHEMA_V0 ?
TUPLE__PARAMS :
schema == DQLITE_REQUEST_PARAMS_SCHEMA_V1 ?
TUPLE__PARAMS32 :
(POST(false && "impossible"), 0);

Check warning on line 779 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L779

Added line #L779 was not covered by tests

for (;;) {
stmt = NULL;
sql = req->sql;
if (sql == NULL || strcmp(sql, "") == 0) {
goto success;
}
rv = bind__params(stmt, cursor, tuple_format);
/* stmt will be set to NULL by sqlite when an error occurs. */
rv = sqlite3_prepare_v2(l->conn, sql, -1, &stmt, &tail);
if (rv != SQLITE_OK) {
failure(req, rv, "bind parameters");
goto done_after_prepare;
tracef("exec sql prepare failed %d", rv);
failure(req, rv, sqlite3_errmsg(l->conn));
goto done;
}
if (stmt == NULL) {
goto success;
}
if (!done) {
rv = bind__params(stmt, cursor, tuple_format);
if (rv != SQLITE_OK) {
failure(req, rv, "bind parameters");
goto done_after_prepare;

Check warning on line 801 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L800-L801

Added lines #L800 - L801 were not covered by tests
}
}
}

req->sql = tail;
g->req = req;

req_id = idNext(&g->random_state);
/* At this point, leader__exec takes ownership of stmt */
rv =
leader__exec(g->leader, &g->exec, stmt, req_id, handle_exec_sql_cb);
if (rv != SQLITE_OK) {
failure(req, rv, sqlite3_errmsg(g->leader->conn));
goto done_after_prepare;
req->sql = tail;
g->req = req;
rv = leader_exec_v2(g->leader, &g->exec, stmt, handle_exec_sql_cb);
if (rv == 0) {
return;
} else if (rv != LEADER_NOT_ASYNC) {
failure(req, rv, sqlite3_errmsg(l->conn));
goto done_after_prepare;

Check warning on line 812 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L811-L812

Added lines #L811 - L812 were not covered by tests
} else if (g->exec.status != SQLITE_DONE) {
/* XXX */
failure(req, g->exec.status, sqlite3_errmsg(l->conn));
goto done_after_prepare;
}
done = true;
sqlite3_finalize(stmt);
req->exec_count++;
}

return;

success:
tracef("handle exec sql next success");
if (req->exec_count > 0) {
fill_result(g, &response);
}
Expand All @@ -813,10 +831,8 @@
g->req = NULL;
}

static void execSqlBarrierCb(struct barrier *barrier, int status)
static void exec_sql_bottom_half(struct gateway *g, int status)
{
tracef("exec sql barrier cb status:%d", status);
struct gateway *g = barrier->data;
struct handle *req = g->req;
assert(req != NULL);
g->req = NULL;
Expand All @@ -829,6 +845,13 @@
handle_exec_sql_next(g, req, false);
}

static void exec_sql_barrier_cb(struct barrier *barrier, int status)
{
tracef("exec sql barrier cb status:%d", status);
struct gateway *g = barrier->data;
exec_sql_bottom_half(g, status);
}

static int handle_exec_sql(struct gateway *g, struct handle *req)
{
tracef("handle exec sql schema:%" PRIu8, req->schema);
Expand Down Expand Up @@ -856,8 +879,10 @@
req->sql = request.sql;
req->exec_count = 0;
g->req = req;
rc = leader__barrier(g->leader, &g->barrier, execSqlBarrierCb);
if (rc != 0) {
rc = leader_barrier_v2(g->leader, &g->barrier, exec_sql_barrier_cb);
if (rc == LEADER_NOT_ASYNC) {
exec_sql_bottom_half(g, 0);
} else if (rc != 0) {
tracef("handle exec sql barrier failed %d", rc);
g->req = NULL;
return rc;
Expand All @@ -884,10 +909,8 @@
}
}

static void querySqlBarrierCb(struct barrier *barrier, int status)
static void query_sql_bottom_half(struct gateway *g, int status)
{
tracef("query sql barrier cb status:%d", status);
struct gateway *g = barrier->data;
struct handle *req = g->req;
assert(req != NULL);
g->req = NULL;
Expand All @@ -898,7 +921,6 @@
sqlite3_stmt *tail_stmt;
int tuple_format;
bool is_readonly;
uint64_t req_id;
int rv;

if (status != 0) {
Expand Down Expand Up @@ -953,17 +975,26 @@
if (is_readonly) {
query_batch(g);
} else {
req_id = idNext(&g->random_state);
rv = leader__exec(g->leader, &g->exec, stmt, req_id,
leaderModifyingQuerySqlCb);
if (rv != 0) {
rv = leader_exec_v2(g->leader, &g->exec, stmt,
leaderModifyingQuerySqlCb);
if (rv == LEADER_NOT_ASYNC) {
/* XXX */
leaderModifyingQuerySqlCb(&g->exec, g->exec.status);

Check warning on line 982 in src/gateway.c

View check run for this annotation

Codecov / codecov/patch

src/gateway.c#L982

Added line #L982 was not covered by tests
} else if (rv != 0) {
sqlite3_finalize(stmt);
g->req = NULL;
failure(req, rv, "leader exec");
}
}
}

static void query_sql_barrier_cb(struct barrier *barrier, int status)
{
tracef("query sql barrier cb status:%d", status);
struct gateway *g = barrier->data;
query_sql_bottom_half(g, status);
}

static int handle_query_sql(struct gateway *g, struct handle *req)
{
tracef("handle query sql schema:%" PRIu8, req->schema);
Expand All @@ -988,8 +1019,10 @@
FAIL_IF_CHECKPOINTING;
req->sql = request.sql;
g->req = req;
rv = leader__barrier(g->leader, &g->barrier, querySqlBarrierCb);
if (rv != 0) {
rv = leader_barrier_v2(g->leader, &g->barrier, query_sql_barrier_cb);
if (rv == LEADER_NOT_ASYNC) {
query_sql_bottom_half(g, 0);
} else if (rv != 0) {
tracef("handle query sql barrier failed %d", rv);
g->req = NULL;
return rv;
Expand Down
Loading
Loading