Skip to content

Commit

Permalink
Rdb channel for full sync
Browse files Browse the repository at this point in the history
In this PR we introduce the main benefit of rdb channel by continuously
steaming the COB in parallel to the RDB and thus keeping the primary
side COB small AND accelerating the overall sync process.
By streaming the replication data to the replica during the full sync,
we reduce
1. Memory load from the primary node.
2. CPU load from the primary main process (will be introduced in later
   PR).
This opens up possibilities to future improvements with better
TLS connection handling and removal of the the need to pipeline the RDB
from the child process to the main.
  • Loading branch information
naglera committed Apr 27, 2023
1 parent 4a93f4d commit cc74971
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
16 changes: 8 additions & 8 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2597,7 +2597,7 @@ void fullSyncWithMaster(connection* conn) {

/* Send replica capabilities */
if (server.repl_state == REPL_STATE_RECEIVE_PSYNC_REPLY) {
serverLog(LL_DEBUG, "Recived first reply from primary using rdb connection. Sending capa");
serverLog(LL_DEBUG, "Received first reply from primary using rdb connection. Sending capa");

/* Send replica lisening port to master for clarification */
sds portstr = getReplicaPortString();
Expand All @@ -2616,7 +2616,7 @@ void fullSyncWithMaster(connection* conn) {
}
return;
}
/* Recive replconf response */
/* Receive replconf response */
if (server.repl_state == REPL_SEC_CONN_RECEIVE_REPLCONF_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
Expand All @@ -2637,7 +2637,7 @@ void fullSyncWithMaster(connection* conn) {
server.repl_state = REPL_SEC_CONN_RECEIVE_ENDOFF;
return;
}
/* Recive ENDOFF reply */
/* Receive ENDOFF reply */
if (server.repl_state == REPL_SEC_CONN_RECEIVE_ENDOFF) {
char buf[PROTO_IOBUF_LEN];
connSyncReadLine(conn, buf, 1024, server.repl_syncio_timeout * 1000);
Expand Down Expand Up @@ -2734,7 +2734,7 @@ void streamReplDataBufToDb(client *c) {
}

/* There are two scenarios in which this method can be called during rdb-channel-sync:
* 1. Main connection succesfully established psync with primary.
* 1. Main connection successfully established psync with primary.
* 2. Rdb connection done loading rdb.
* Each time this method is invoked, we check whether the other connection has completed
* his part and act accordingly */
Expand Down Expand Up @@ -2952,9 +2952,9 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';
if (isOngoingRdbChannelSync()) {
/* Incase we are using rdb connection sync, the master is already initialized by now
* In this case we will just verify his id. Notice that in this case chached master is not
* relevent because it wasn't used */
/* In case we are using rdb connection sync, the master is already initialized by now
* In this case we will just verify his id. Notice that in this case cached master is not
* relevant because it wasn't used */
serverAssert(strcmp(new, server.psync_master->replid) == 0);
server.psync_master->read_reploff = server.psync_master->reploff;
/* Disconnect all the sub-slaves: they need to be notified. */
Expand Down Expand Up @@ -3471,7 +3471,7 @@ int cancelReplicationHandshake(int reconnect) {
server.repl_state = REPL_STATE_CONNECT;
} else if (server.repl_state == REPL_SEC_CONN_RECEIVE_PSYNC_REPLY ||
server.repl_state == REPL_SEC_CONN_SEND_PSYNC){
/* As we cancel the sync, the rdb connection state is no longer relevent */
/* As we cancel the sync, the rdb connection state is no longer relevant */
server.repl_state = REPL_STATE_TRANSFER;
replicationAbortSyncTransfer();
server.repl_state = REPL_STATE_CONNECT;
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,7 @@ struct redisServer {
redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
redisAtomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */
redisAtomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */
redisAtomic long long stat_repl_processed_bytes; /* Bytes proccessed from replica's local replication buffer after full sync */
redisAtomic long long stat_repl_processed_bytes; /* Bytes processed from replica's local replication buffer after full sync */
size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */
size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */
monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ start_server {tags {"repl rdb-channel external:skip"}} {
wait_for_condition 50 1000 {
[$replica get foo] == 1
} else {
fail "Replica isn't conencted"
fail "Replica isn't connected"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ proc start_write_load {host port seconds} {
}

# Execute a background process writing only one key for the specified number
# of seconds to the specified Redis instance. This load handler is usefull for
# of seconds to the specified Redis instance. This load handler is useful for
# tests which requires heavy replication stream but no memory load.
proc start_one_key_write_load {host port seconds key} {
set tclsh [info nameofexecutable]
Expand Down

0 comments on commit cc74971

Please sign in to comment.