Skip to content

Commit

Permalink
Remove remaining traces of local_data
Browse files Browse the repository at this point in the history
I've preserved is_local since it's still potentially useful.

Signed-off-by: Cole Miller <m@cole-miller.net>
  • Loading branch information
cole-miller committed Sep 18, 2024
1 parent 4d462c5 commit 680cf42
Show file tree
Hide file tree
Showing 18 changed files with 38 additions and 105 deletions.
10 changes: 0 additions & 10 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,7 @@ static void leaderMaybeCheckpointLegacy(struct leader *l)
tracef("raft_malloc - no mem");
goto err_after_buf_alloc;
}
#ifdef USE_SYSTEM_RAFT
rv = raft_apply(l->raft, apply, &buf, 1, leaderCheckpointApplyCb);
#else
rv = raft_apply(l->raft, apply, &buf, NULL, 1, leaderCheckpointApplyCb);
#endif
if (rv != 0) {
tracef("raft_apply failed %d", rv);
raft_free(apply);
Expand Down Expand Up @@ -336,13 +332,7 @@ static int leaderApplyFrames(struct exec *req,
apply->type = COMMAND_FRAMES;
idSet(apply->req.req_id, req->id);

#ifdef USE_SYSTEM_RAFT
rv = raft_apply(l->raft, &apply->req, &buf, 1, leaderApplyFramesCb);
#else
/* TODO actual WAL slice goes here */
struct raft_entry_local_data local_data = {};
rv = raft_apply(l->raft, &apply->req, &buf, &local_data, 1, leaderApplyFramesCb);
#endif
if (rv != 0) {
tracef("raft apply failed %d", rv);
goto err_after_command_encode;
Expand Down
30 changes: 0 additions & 30 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,28 +198,6 @@ enum {
RAFT_CHANGE /* Raft configuration change. */
};

/**
* A small fixed-size inline buffer that stores extra data for a raft_entry
* that is different for each node in the cluster.
*
* A leader initializes the local data for an entry before passing it into
* raft_apply. This local data is stored in the volatile raft log and also
* in the persistent raft log on the leader. AppendEntries messages sent by
* the leader never contain the local data for entries.
*
* When a follower accepts an AppendEntries request, it invokes a callback
* provided by the FSM to fill out the local data for each new entry before
* appending the entries to its log (volatile and persistent). This local
* data doesn't have to be the same as the local data that the leader computed.
*
* When starting up, a raft node reads the local data for each entry for its
* persistent log as part of populating the volatile log.
*/
struct raft_entry_local_data {
/* Must be the only member of this struct. */
uint8_t buf[16];
};

/**
* A single entry in the raft log.
*
Expand Down Expand Up @@ -250,20 +228,13 @@ struct raft_entry_local_data {
* message or in the persistent log. This field can be used by the FSM's `apply`
* callback to handle a COMMAND entry differently depending on whether it
* originated locally.
*
* Note: The @local_data and @is_local fields do not exist when we use an external
* libraft, because the last separate release of libraft predates their addition.
* The ifdef at the very top of this file ensures that we use the system raft headers
* when we build against an external libraft, so there will be no ABI mismatch as
* a result of incompatible struct layouts.
*/
struct raft_entry
{
raft_term term; /* Term in which the entry was created. */
unsigned short type; /* Type (FSM command, barrier, config change). */
bool is_local; /* Placed here so it goes in the padding after @type. */
struct raft_buffer buf; /* Entry data. */
struct raft_entry_local_data local_data;
void *batch; /* Batch that buf's memory points to, if any. */
};

Expand Down Expand Up @@ -1244,7 +1215,6 @@ struct raft_apply
RAFT_API int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n,
raft_apply_cb cb);

Expand Down
5 changes: 2 additions & 3 deletions src/raft/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n,
raft_apply_cb cb)
{
Expand Down Expand Up @@ -42,7 +41,7 @@ int raft_apply(struct raft *r,
req->cb = cb;

/* Append the new entries to the log. */
rv = logAppendCommands(r->log, r->current_term, bufs, local_data, n);
rv = logAppendCommands(r->log, r->current_term, bufs, n);
if (rv != 0) {
goto err;
}
Expand Down Expand Up @@ -91,7 +90,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)
req->index = index;
req->cb = cb;

rv = logAppend(r->log, r->current_term, RAFT_BARRIER, buf, (struct raft_entry_local_data){}, true, NULL);
rv = logAppend(r->log, r->current_term, RAFT_BARRIER, buf, true, NULL);
if (rv != 0) {
goto err_after_buf_alloc;
}
Expand Down
2 changes: 1 addition & 1 deletion src/raft/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ static void copyLeaderLog(struct raft_fixture *f)
assert(buf.base != NULL);
memcpy(buf.base, entry->buf.base, buf.len);
/* FIXME(cole) what to do here for is_local? */
rv = logAppend(f->log, entry->term, entry->type, buf, (struct raft_entry_local_data){}, false, NULL);
rv = logAppend(f->log, entry->term, entry->type, buf, false, NULL);
assert(rv == 0);
}
logRelease(raft->log, 1, entries, n);
Expand Down
8 changes: 2 additions & 6 deletions src/raft/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ int logAppend(struct raft_log *l,
const raft_term term,
const unsigned short type,
struct raft_buffer buf,
struct raft_entry_local_data local_data,
bool is_local,
void *batch)
{
Expand Down Expand Up @@ -576,7 +575,6 @@ int logAppend(struct raft_log *l,
entry->type = type;
entry->buf = buf;
entry->batch = batch;
entry->local_data = local_data;
entry->is_local = is_local;

l->back += 1;
Expand All @@ -588,7 +586,6 @@ int logAppend(struct raft_log *l,
int logAppendCommands(struct raft_log *l,
const raft_term term,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n)
{
unsigned i;
Expand All @@ -600,8 +597,7 @@ int logAppendCommands(struct raft_log *l,
assert(n > 0);

for (i = 0; i < n; i++) {
struct raft_entry_local_data loc = (local_data != NULL) ? local_data[i] : (struct raft_entry_local_data){};
rv = logAppend(l, term, RAFT_COMMAND, bufs[i], loc, true, NULL);
rv = logAppend(l, term, RAFT_COMMAND, bufs[i], true, NULL);
if (rv != 0) {
return rv;
}
Expand All @@ -628,7 +624,7 @@ int logAppendConfiguration(struct raft_log *l,
}

/* Append the new entry to the log. */
rv = logAppend(l, term, RAFT_CHANGE, buf, (struct raft_entry_local_data){}, true, NULL);
rv = logAppend(l, term, RAFT_CHANGE, buf, true, NULL);
if (rv != 0) {
goto err_after_encode;
}
Expand Down
2 changes: 0 additions & 2 deletions src/raft/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,13 @@ int logAppend(struct raft_log *l,
raft_term term,
unsigned short type,
struct raft_buffer buf,
struct raft_entry_local_data local_data,
bool is_local,
void *batch);

/* Convenience to append a series of #RAFT_COMMAND entries. */
int logAppendCommands(struct raft_log *l,
const raft_term term,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n);

/* Convenience to encode and append a single #RAFT_CHANGE entry. */
Expand Down
2 changes: 1 addition & 1 deletion src/raft/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ int replicationAppend(struct raft *r,
goto err_after_request_alloc;
}

rv = logAppend(r->log, copy.term, copy.type, copy.buf, (struct raft_entry_local_data){}, false, NULL);
rv = logAppend(r->log, copy.term, copy.type, copy.buf, false, NULL);
if (rv != 0) {
goto err_after_request_alloc;
}
Expand Down
2 changes: 1 addition & 1 deletion src/raft/start.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static int restoreEntries(struct raft *r,
for (i = 0; i < n; i++) {
struct raft_entry *entry = &entries[i];
rv = logAppend(r->log, entry->term, entry->type, entry->buf,
entry->local_data, entry->is_local, entry->batch);
entry->is_local, entry->batch);
if (rv != 0) {
goto err;
}
Expand Down
2 changes: 1 addition & 1 deletion src/raft/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ static size_t uvAppendSize(struct uvAppend *a)
{
size_t size = sizeof(uint32_t) * 2; /* CRC checksums */
unsigned i;
size += uvSizeofBatchHeader(a->n, true); /* Batch header */
size += uvSizeofBatchHeader(a->n); /* Batch header */
for (i = 0; i < a->n; i++) { /* Entries data */
size += bytePad64(a->entries[i].buf.len);
}
Expand Down
24 changes: 6 additions & 18 deletions src/raft/uv_encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@ static size_t sizeofTimeoutNow(void)
sizeof(uint64_t) /* Last log term. */;
}

size_t uvSizeofBatchHeader(size_t n, bool with_local_data)
size_t uvSizeofBatchHeader(size_t n)
{
size_t res = 8 + /* Number of entries in the batch, little endian */
16 * n; /* One header per entry */;
(void)with_local_data;
return res;
}

Expand Down Expand Up @@ -139,7 +138,7 @@ static void encodeAppendEntries(const struct raft_append_entries *p, void *buf)
bytePut64(&cursor, p->prev_log_term); /* Previous term. */
bytePut64(&cursor, p->leader_commit); /* Commit index. */

uvEncodeBatchHeader(p->entries, p->n_entries, cursor, false /* no local data */);
uvEncodeBatchHeader(p->entries, p->n_entries, cursor);
}

static void encodeAppendEntriesResult(
Expand Down Expand Up @@ -297,17 +296,14 @@ int uvEncodeMessage(const struct raft_message *message,

void uvEncodeBatchHeader(const struct raft_entry *entries,
unsigned n,
void *buf,
bool with_local_data)
void *buf)
{
unsigned i;
void *cursor = buf;

/* Number of entries in the batch, little endian */
bytePut64(&cursor, n);

(void)with_local_data;

for (i = 0; i < n; i++) {
const struct raft_entry *entry = &entries[i];

Expand Down Expand Up @@ -368,8 +364,7 @@ static void decodeRequestVoteResult(const uv_buf_t *buf,

int uvDecodeBatchHeader(const void *batch,
struct raft_entry **entries,
unsigned *n,
uint64_t *local_data_size)
unsigned *n)
{
const void *cursor = batch;
size_t i;
Expand All @@ -382,8 +377,6 @@ int uvDecodeBatchHeader(const void *batch,
return 0;
}

(void)local_data_size;

*entries = raft_malloc(*n * sizeof **entries);

if (*entries == NULL) {
Expand Down Expand Up @@ -438,7 +431,7 @@ static int decodeAppendEntries(const uv_buf_t *buf,
args->prev_log_term = byteGet64(&cursor);
args->leader_commit = byteGet64(&cursor);

rv = uvDecodeBatchHeader(cursor, &args->entries, &args->n_entries, false);
rv = uvDecodeBatchHeader(cursor, &args->entries, &args->n_entries);
if (rv != 0) {
return rv;
}
Expand Down Expand Up @@ -560,8 +553,7 @@ int uvDecodeMessage(uint16_t type,
int uvDecodeEntriesBatch(uint8_t *batch,
size_t offset,
struct raft_entry *entries,
unsigned n,
uint64_t local_data_size)
unsigned n)
{
uint8_t *cursor;

Expand All @@ -581,10 +573,6 @@ int uvDecodeEntriesBatch(uint8_t *batch,
}

entry->is_local = false;

entry->local_data = (struct raft_entry_local_data){};
assert(local_data_size <= sizeof(entry->local_data.buf));
assert(local_data_size % 8 == 0);
}
return 0;
}
12 changes: 4 additions & 8 deletions src/raft/uv_encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ int uvDecodeMessage(uint16_t type,

int uvDecodeBatchHeader(const void *batch,
struct raft_entry **entries,
unsigned *n,
uint64_t *local_data_size);
unsigned *n);

int uvDecodeEntriesBatch(uint8_t *batch,
size_t offset,
struct raft_entry *entries,
unsigned n,
uint64_t local_data_size);
unsigned n);

/**
* The layout of the memory pointed at by a @batch pointer is the following:
Expand All @@ -47,17 +45,15 @@ int uvDecodeEntriesBatch(uint8_t *batch,
* [1 byte ] Message type (Either RAFT_COMMAND or RAFT_CHANGE)
* [3 bytes] Currently unused.
* [4 bytes] Size of the log entry data, little endian.
* [8 bytes] Size of the local buffer, little endian.
*
* A payload data section for an entry is simply a sequence of bytes of
* arbitrary lengths, possibly padded with extra bytes to reach 8-byte boundary
* (which means that all entry data pointers are 8-byte aligned).
*/
size_t uvSizeofBatchHeader(size_t n, bool with_local_data);
size_t uvSizeofBatchHeader(size_t n);

void uvEncodeBatchHeader(const struct raft_entry *entries,
unsigned n,
void *buf,
bool with_local_data);
void *buf);

#endif /* UV_ENCODING_H_ */
3 changes: 1 addition & 2 deletions src/raft/uv_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ static void uvServerReadCb(uv_stream_t *stream,
payload.base, 0,
s->message.append_entries.entries,
s->message.append_entries
.n_entries,
false);
.n_entries);
break;
case RAFT_IO_INSTALL_SNAPSHOT:
s->message.install_snapshot.data.base =
Expand Down
Loading

0 comments on commit 680cf42

Please sign in to comment.