Skip to content

Commit

Permalink
Merge pull request #604 from just-now/no-queue-macro
Browse files Browse the repository at this point in the history
Replace queue macro with inline functions
  • Loading branch information
cole-miller committed Mar 1, 2024
2 parents f2bfdff + 27e7835 commit 645e618
Show file tree
Hide file tree
Showing 36 changed files with 381 additions and 447 deletions.
2 changes: 1 addition & 1 deletion bt/request
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct request
void *data;
int type;
raft_index index;
void *queue[2];
queue queue;
};
uprobe:$libraft_path:lifecycleRequestStart
Expand Down
4 changes: 2 additions & 2 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ int db__init(struct db *db, struct config *config, const char *filename)
db->follower = NULL;
db->tx_id = 0;
db->read_lock = 0;
QUEUE__INIT(&db->leaders);
queue_init(&db->leaders);
return 0;

err_after_path_alloc:
Expand All @@ -59,7 +59,7 @@ int db__init(struct db *db, struct config *config, const char *filename)

void db__close(struct db *db)
{
assert(QUEUE__IS_EMPTY(&db->leaders));
assert(queue_empty(&db->leaders));
if (db->follower != NULL) {
int rc;
rc = sqlite3_close(db->follower);
Expand Down
56 changes: 28 additions & 28 deletions src/fsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,10 @@ static unsigned snapshotNumBufs(struct fsm *f)
queue *head;
unsigned n = 1; /* snapshot header */

QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
n += 2; /* database header & wal */
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
n += dbNumPages(db); /* 1 buffer per page (zero copy) */
}

Expand Down Expand Up @@ -539,12 +539,12 @@ static void freeSnapshotBufs(struct fsm *f,

i = 1;
/* Free all database headers & WAL buffers */
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
if (i == n_bufs) {
break;
}
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
/* i is the index of the database header */
sqlite3_free(bufs[i].base);
/* i is now the index of the next database header (if any) */
Expand All @@ -567,19 +567,19 @@ static int fsm__snapshot(struct raft_fsm *fsm,

/* First count how many databases we have and check that no transaction
* nor checkpoint nor other snapshot is in progress. */
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
if (db->tx_id != 0 || db->read_lock) {
return RAFT_BUSY;
}
n_db++;
}

/* Lock all databases, preventing the checkpoint from running */
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
rv = databaseReadLock(db);
assert(rv == 0);
}
Expand All @@ -598,9 +598,9 @@ static int fsm__snapshot(struct raft_fsm *fsm,

/* Encode individual databases. */
i = 1;
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
/* database_header + num_pages + wal */
unsigned n = 1 + dbNumPages(db) + 1;
rv = encodeDatabase(db, &(*bufs)[i], n);
Expand All @@ -618,9 +618,9 @@ static int fsm__snapshot(struct raft_fsm *fsm,
err_after_bufs_alloc:
sqlite3_free(*bufs);
err:
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
databaseReadUnlock(db);
}
assert(rv != 0);
Expand Down Expand Up @@ -663,12 +663,12 @@ static int fsm__snapshot_finalize(struct raft_fsm *fsm,
/* Unlock all databases that were locked for the snapshot, this is safe
* because DB's are only ever added at the back of the queue. */
n_db = 0;
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
if (n_db == header.n) {
break;
}
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
rv = databaseReadUnlock(db);
assert(rv == 0);
n_db++;
Expand Down Expand Up @@ -815,7 +815,7 @@ static unsigned snapshotNumBufsDisk(struct fsm *f)
queue *head;
unsigned n = 1; /* snapshot header */

QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
n += 3; /* database header, database file and wal */
}
Expand Down Expand Up @@ -849,7 +849,7 @@ static void freeSnapshotBufsDisk(struct fsm *f,

i = 1;
/* Free all database headers & WAL buffers. Unmap the DB file. */
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
if (i == n_bufs) {
break;
Expand Down Expand Up @@ -878,9 +878,9 @@ static int fsm__snapshot_disk(struct raft_fsm *fsm,

/* First count how many databases we have and check that no transaction
* nor checkpoint nor other snapshot is in progress. */
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
if (db->tx_id != 0 || db->read_lock) {
return RAFT_BUSY;
}
Expand All @@ -890,9 +890,9 @@ static int fsm__snapshot_disk(struct raft_fsm *fsm,
/* Lock all databases, preventing the checkpoint from running. This
* ensures the database is not written while it is mmap'ed and copied by
* raft. */
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
rv = databaseReadLock(db);
assert(rv == 0);
}
Expand All @@ -917,9 +917,9 @@ static int fsm__snapshot_disk(struct raft_fsm *fsm,

/* Copy WAL of all databases. */
i = 1;
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
/* database_header + db + WAL */
unsigned n = 3;
/* pass pointer to buffer that will contain WAL. */
Expand All @@ -938,9 +938,9 @@ static int fsm__snapshot_disk(struct raft_fsm *fsm,
err_after_bufs_alloc:
sqlite3_free(*bufs);
err:
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
databaseReadUnlock(db);
}
assert(rv != 0);
Expand Down Expand Up @@ -972,13 +972,13 @@ static int fsm__snapshot_async_disk(struct raft_fsm *fsm,

/* Encode individual databases. */
i = 1;
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
if (i == *n_bufs) {
/* In case a db was added in meanwhile */
break;
}
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
/* database_header + database file + wal */
unsigned n = 3;
rv = encodeDiskDatabaseAsync(db, &(*bufs)[i], n);
Expand Down Expand Up @@ -1031,12 +1031,12 @@ static int fsm__snapshot_finalize_disk(struct raft_fsm *fsm,
/* Unlock all databases that were locked for the snapshot, this is safe
* because DB's are only ever added at the back of the queue. */
n_db = 0;
QUEUE__FOREACH(head, &f->registry->dbs)
QUEUE_FOREACH(head, &f->registry->dbs)
{
if (n_db == header.n) {
break;
}
db = QUEUE__DATA(head, struct db, queue);
db = QUEUE_DATA(head, struct db, queue);
databaseReadUnlock(db);
n_db++;
}
Expand Down
4 changes: 2 additions & 2 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ int leader__init(struct leader *l, struct db *db, struct raft *raft)

l->exec = NULL;
l->inflight = NULL;
QUEUE__PUSH(&db->leaders, &l->queue);
queue_insert_tail(&db->leaders, &l->queue);
return 0;
}

Expand All @@ -153,7 +153,7 @@ void leader__close(struct leader *l)
rc = sqlite3_close(l->conn);
assert(rc == 0);

QUEUE__REMOVE(&l->queue);
queue_remove(&l->queue);
}

/* A checkpoint command that fails to commit is not a huge issue.
Expand Down
Loading

0 comments on commit 645e618

Please sign in to comment.