Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload replication writes to IO threads #1485

Merged
merged 8 commits into from
Feb 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica reads are not offloaded to IO threads. */
/* For simplicity, don't offload replica clients reads as read traffic from replica is negligible */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* With Lua debug client we may call connWrite directly in the main thread */
if (c->flag.lua_debug) return C_ERR;
Expand Down Expand Up @@ -364,8 +364,8 @@ int trySendWriteToIOThreads(client *c) {
if (c->io_write_state != CLIENT_IDLE) return C_OK;
/* Nothing to write */
if (!clientHasPendingReplies(c)) return C_ERR;
/* Currently, replica writes are not offloaded to IO threads. */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* For simplicity, avoid offloading non-online replicas */
if (getClientType(c) == CLIENT_TYPE_REPLICA && c->repl_data->repl_state != REPLICA_STATE_ONLINE) return C_ERR;
/* We can't offload debugged clients as the main-thread may read at the same time */
if (c->flag.lua_debug) return C_ERR;

Expand All @@ -392,21 +392,29 @@ int trySendWriteToIOThreads(client *c) {
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);

/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
if (is_replica) {
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
replBufBlock *o = listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = o->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
}
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);

serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0 || is_replica);

/* The main-thread will update the client state after the I/O thread completes the write. */
connSetPostponeUpdateState(c->conn, 1);
c->write_flags = 0;
c->write_flags = is_replica ? WRITE_FLAGS_IS_REPLICA : 0;
c->io_write_state = CLIENT_PENDING_IO;

IOJobQueue_push(jq, ioThreadWriteToClient, c);
Expand Down
152 changes: 123 additions & 29 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,9 @@ void disconnectReplicas(void) {
void unlinkClient(client *c) {
listNode *ln;

/* Wait for IO operations to be done before unlinking the client. */
waitForClientIO(c);

/* If this is marked as current client unset it. */
if (c->conn && server.current_client == c) server.current_client = NULL;

Expand Down Expand Up @@ -1934,36 +1937,122 @@ client *lookupClientByID(uint64_t id) {
return c;
}

void writeToReplica(client *c) {
/* Can be called from main-thread only as replica write offload is not supported yet */
serverAssert(inMainThread());
int nwritten = 0;
static void postWriteToReplica(client *c) {
if (c->nwritten <= 0) return;

server.stat_net_repl_output_bytes += c->nwritten;

/* Locate the last node which has leftover data and
* decrement reference counts of all nodes in front of it.
* Set c->ref_repl_buf_node to point to the last node and
* c->ref_block_pos to the offset within that node */
listNode *curr = c->repl_data->ref_repl_buf_node;
listNode *next = NULL;
size_t nwritten = c->nwritten + c->repl_data->ref_block_pos;
replBufBlock *o = listNodeValue(curr);

while (nwritten >= o->used) {
next = listNextNode(curr);
if (!next) break; /* End of list */

nwritten -= o->used;
o->refcount--;

curr = next;
o = listNodeValue(curr);
o->refcount++;
}

serverAssert(nwritten <= o->used);
c->repl_data->ref_repl_buf_node = curr;
c->repl_data->ref_block_pos = nwritten;

incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}

static void writeToReplica(client *c) {
listNode *last_node;
size_t bufpos;

serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
while (clientHasPendingReplies(c)) {
replBufBlock *o = listNodeValue(c->repl_data->ref_repl_buf_node);
serverAssert(o->used >= c->repl_data->ref_block_pos);

/* Send current block if it is not fully sent. */
if (o->used > c->repl_data->ref_block_pos) {
nwritten = connWrite(c->conn, o->buf + c->repl_data->ref_block_pos, o->used - c->repl_data->ref_block_pos);
if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
return;
}
c->nwritten += nwritten;
c->repl_data->ref_block_pos += nwritten;
/* Determine the last block and buffer position based on thread context */
if (inMainThread()) {
last_node = listLast(server.repl_buffer_blocks);
if (!last_node) return;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
bufpos = ((replBufBlock *)listNodeValue(last_node))->used;
} else {
last_node = c->io_last_reply_block;
serverAssert(last_node != NULL);
bufpos = c->io_last_bufpos;
}

listNode *first_node = c->repl_data->ref_repl_buf_node;

/* Handle the single block case */
if (first_node == last_node) {
replBufBlock *b = listNodeValue(first_node);
c->nwritten = connWrite(c->conn, b->buf + c->repl_data->ref_block_pos, bufpos - c->repl_data->ref_block_pos);
if (c->nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
}
return;
}

/* Multiple blocks case */
ssize_t total_bytes = 0;
int iovcnt = 0;
struct iovec iov_arr[IOV_MAX];
struct iovec *iov = iov_arr;
int iovmax = min(IOV_MAX, c->conn->iovcnt);

for (listNode *cur_node = first_node; cur_node != NULL && iovcnt < iovmax; cur_node = listNextNode(cur_node)) {
replBufBlock *cur_block = listNodeValue(cur_node);
size_t start = (cur_node == first_node) ? c->repl_data->ref_block_pos : 0;
size_t len = (cur_node == last_node) ? bufpos : cur_block->used;
len -= start;

iov[iovcnt].iov_base = cur_block->buf + start;
iov[iovcnt].iov_len = len;
total_bytes += len;
iovcnt++;
if (cur_node == last_node) break;
}

if (total_bytes == 0) return;

ssize_t totwritten = 0;
while (iovcnt > 0) {
int nwritten = connWritev(c->conn, iov, iovcnt);

if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
c->nwritten = (totwritten > 0) ? totwritten : nwritten;
return;
}

totwritten += nwritten;

if (totwritten == total_bytes) {
break;
}

/* If we fully sent the object on head, go to the next one. */
listNode *next = listNextNode(c->repl_data->ref_repl_buf_node);
if (next && c->repl_data->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->repl_data->ref_repl_buf_node = next;
c->repl_data->ref_block_pos = 0;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
/* Update iov array */
while (nwritten > 0) {
if ((size_t)nwritten < iov[0].iov_len) {
/* partial block written */
iov[0].iov_base = (char *)iov[0].iov_base + nwritten;
iov[0].iov_len -= nwritten;
break;
}

/* full block written */
nwritten -= iov[0].iov_len;
iov++;
iovcnt--;
}
}

c->nwritten = totwritten;
}

/* This function should be called from _writeToClient when the reply list is not empty,
Expand Down Expand Up @@ -2158,7 +2247,7 @@ int postWriteToClient(client *c) {
if (getClientType(c) != CLIENT_TYPE_REPLICA) {
_postWriteToClient(c);
} else {
server.stat_net_repl_output_bytes += c->nwritten > 0 ? c->nwritten : 0;
postWriteToReplica(c);
}

if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) {
Expand Down Expand Up @@ -2718,7 +2807,7 @@ void processMultibulkBuffer(client *c) {
serverAssertWithInfo(c, NULL, c->argc == 0);

/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf + c->qb_pos, '\r');
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
madolson marked this conversation as resolved.
Show resolved Hide resolved
if (newline == NULL) {
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
c->read_flags |= READ_FLAGS_ERROR_BIG_MULTIBULK;
Expand Down Expand Up @@ -2795,7 +2884,7 @@ void processMultibulkBuffer(client *c) {
while (c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
newline = strchr(c->querybuf + c->qb_pos, '\r');
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
if (newline == NULL) {
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
c->read_flags |= READ_FLAGS_ERROR_BIG_BULK_COUNT;
Expand Down Expand Up @@ -5024,7 +5113,12 @@ void ioThreadWriteToClient(void *data) {
client *c = data;
serverAssert(c->io_write_state == CLIENT_PENDING_IO);
c->nwritten = 0;
_writeToClient(c);
if (c->write_flags & WRITE_FLAGS_IS_REPLICA) {
writeToReplica(c);
} else {
_writeToClient(c);
}

atomic_thread_fence(memory_order_release);
c->io_write_state = CLIENT_COMPLETED_IO;
}
2 changes: 0 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4238,8 +4238,6 @@ void replicationCachePrimary(client *c) {
serverAssert(server.primary != NULL && server.cached_primary == NULL);
serverLog(LL_NOTICE, "Caching the disconnected primary state.");

/* Wait for IO operations to be done before proceeding */
waitForClientIO(c);
/* Unlink the client from the server structures. */
unlinkClient(c);

Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2633,7 +2633,7 @@ void dictVanillaFree(void *val);

/* Write flags for various write errors and states */
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)

#define WRITE_FLAGS_IS_REPLICA (1 << 1)

client *createClient(connection *conn);
void freeClient(client *c);
Expand Down
Loading
Loading