-
Notifications
You must be signed in to change notification settings - Fork 707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Offload replication writes to IO threads #1485
base: unstable
Are you sure you want to change the base?
Conversation
Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Signed-off-by: Uri Yagelnik <uriy@amazon.com>
3aee49b
to
846c816
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #1485 +/- ##
============================================
+ Coverage 70.78% 70.85% +0.07%
============================================
Files 119 119
Lines 64691 64928 +237
============================================
+ Hits 45790 46005 +215
- Misses 18901 18923 +22
|
listNode *last_node; | ||
size_t bufpos; | ||
|
||
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); | ||
while (clientHasPendingReplies(c)) { | ||
replBufBlock *o = listNodeValue(c->ref_repl_buf_node); | ||
serverAssert(o->used >= c->ref_block_pos); | ||
|
||
/* Send current block if it is not fully sent. */ | ||
if (o->used > c->ref_block_pos) { | ||
nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos); | ||
if (nwritten <= 0) { | ||
c->write_flags |= WRITE_FLAGS_WRITE_ERROR; | ||
return; | ||
} | ||
c->nwritten += nwritten; | ||
c->ref_block_pos += nwritten; | ||
/* Determine the last block and buffer position based on thread context */ | ||
if (inMainThread()) { | ||
last_node = listLast(server.repl_buffer_blocks); | ||
if (!last_node) return; | ||
bufpos = ((replBufBlock *)listNodeValue(last_node))->used; | ||
} else { | ||
last_node = c->io_last_reply_block; | ||
serverAssert(last_node != NULL); | ||
bufpos = c->io_last_bufpos; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider simplifying this code to reduce duplication and improve clarity, eg:
listNode *last_node = inMainThread() ? listLast(server.repl_buffer_blocks) : c->io_last_reply_block;
if (!last_node) return;
size_t bufpos = inMainThread() ?
((replBufBlock *)listNodeValue(last_node))->used : c->io_last_bufpos;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the current version is clearer since we check inMainThread()
only once. Additionally, we handle the !last_node
case differently depending on whether we are in the main thread or not.
replBufBlock *block = zmalloc(sizeof(replBufBlock) + 128); | ||
block->size = 128; | ||
block->used = 100; | ||
block->refcount = 1; | ||
|
||
listAddNodeTail(server.repl_buffer_blocks, block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this code is duplicated, I suggesting refactoring this into:
appendReplBufBlock(size_t size, size_t used) ;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The size and used values differ each time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi - I tested this code and I got this error. I did not see it with HEAD~1
2576783:S 06 Jan 2025 16:06:12.381 # Protocol error (Master using the inline protocol. Desync?) from client: id=6 addr=127.0.0.1:6379 laddr=127.0.0.1:56284 fd=11 name=*redacted* age=43 idle=0 flags=M db=0 sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=22870 qbuf-free=18084 argv-mem=0 multi-mem=0 rbs=1024 rbp=42 obl=0 oll=0 omem=0 tot-mem=42880 events=r cmd=set user=*redacted* redir=-1 resp=2 lib-name= lib-ver= tot-net-in=218001645 tot-net-out=1809 tot-cmds=1267450. Query buffer during protocol error: 'SET..$16..key:__rand_int__..$128..VXKeHogKgJ=[5V9_X^b?48OKF2jGA<' (... more 896 bytes ...) 'mcS2^N1J?ELSX@CfKQ7cM5aea\ngY8a3LGgNVa9eRA46XS8>7ABe1>Jl9O\Rm\..'
I did run: src/valkey-benchmark -t set -d 128 -n 5000000 --threads 10
valkey with 4 io threads
Looks like data is corrupted, look at the second message:
*3\r $3\r SET\r $16\r key:000000630274\r $128\r VXKeHogKgJ=[5V9_X^b?48OKF2jGA<f:iR@50o7dS3JV4Q6L68lC[GTA]0DaMg?_oSmcS2^N1J?ELSX@CfKQ7cM5aea\\ngY8a3LGgNVa9eRA46XS8>7ABe1>Jl9O\\Rm\\\r *3\r $3\r SET\r $16\r key:000000420097\r $128\r VXKeHogKgJ=[5V9_X^b?48OKF2jGA<f:iR@50o7dS3JV4Q6L68lC[GTA]0DaMg?_oSmcS2^N1J?ELSX@CfKQ7cM5aea\\ngY8a3LGgNVa9eRA46XS8>7ABe1>Jl9O\\Rm\\\r o7dS3JV4Q6L68lC[GTA]0DaMg?_oSmcS2^N1J?ELSX@CfKQ7cM5aea\\ngY8a3LGgNVa9eRA46XS8>7ABe1>Jl9O\\Rm\\\r
The payload o7dS3JV4Q6L68lC[GTA]0DaMg?_oSmcS2^N1J?ELSX@CfKQ7cM5aea\\ngY8a3LGgNVa9eRA46XS8>7ABe1>Jl9O\\Rm\\\r
is duplicated.
Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Many thanks @xbasel for finding it. |
src/replication.c
Outdated
@@ -4136,6 +4136,8 @@ void replicationCachePrimary(client *c) { | |||
serverAssert(server.primary != NULL && server.cached_primary == NULL); | |||
serverLog(LL_NOTICE, "Caching the disconnected primary state."); | |||
|
|||
/* Wait for IO operations to be done before proceeding */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? I guess I don't immediately understand why we need to wait for IO on these client unlinks, but not the other places we free the client. EDIT: I suppose this is related to the fact the client might be re-used later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is required, but I thought it would be safer to check that no I/O operations are running before unlinking the client. After thinking about it, we already wait for client I/O in freeClient, so this check isn't needed here since it's called from freeClient. I moved this check inside unlinkClient since it might be called from other places without calling freeClient first.
/* Determine the last block and buffer position based on thread context */ | ||
if (inMainThread()) { | ||
last_node = listLast(server.repl_buffer_blocks); | ||
if (!last_node) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When can last_node be NULL here? We should only be calling writeToReplica when there is data right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you're right, but I'm hesitant to deviate from the current implementation which allows it to be null.
static void writeToReplica(client *c) {
...
while (clientHasPendingReplies(c)) {
Signed-off-by: Uri Yagelnik <uriy@amazon.com>
This PR offloads the write to replica clients to IO threads.
Main Changes
Implementation Details
In order to offload the writes,
writeToReplica
has been split into 2 parts:Additional Changes
writeToReplica
we now usewritev
in case more than 1 buffer exists.nwritten
field tossize_t
since with a replica thenwritten
can theoretically exceedint
size (not subject toNET_MAX_WRITES_PER_EVENT
limit).memchr
instead ofstrchr
:strchr
to look for the next\r
memchr
as it's more secure and resolves the issueTesting
Related issue: #761