Skip to content

Commit

Permalink
Improve dual channel replication stability and fix compatibility issu…
Browse files Browse the repository at this point in the history
…es (#804)

Introduce several improvements to improve the stability of dual-channel
replication and fix compatibility issues.

1. Make dual-channel-replication tests more reliable: use pause instead
of forced sleep.
2. Fix race conditions when freeing RDB client.
3. Check if sync was stopped during local buffer streaming.
4. Fix $ENDOFFSET reply format to work on 32-bit machines too.

---------

Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
  • Loading branch information
naglera and madolson authored Jul 25, 2024
1 parent da286a5 commit 48ca2c9
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 119 deletions.
18 changes: 10 additions & 8 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ void debugCommand(client *c) {
" In case RESET is provided the peak reset time will be restored to the default value",
"REPLYBUFFER RESIZING <0|1>",
" Enable or disable the reply buffer resize cron job",
"SLEEP-AFTER-FORK-SECONDS <seconds>",
" Stop the server's main process for <seconds> after forking.",
"PAUSE-AFTER-FORK <0|1>",
" Stop the server's main process after fork.",
"DELAY-RDB-CLIENT-FREE-SECOND <seconds>",
" Grace period in seconds for replica main channel to establish psync.",
"DICT-RESIZING <0|1>",
Expand Down Expand Up @@ -995,12 +995,8 @@ void debugCommand(client *c) {
return;
}
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "sleep-after-fork-seconds") && c->argc == 3) {
double sleep_after_fork_seconds;
if (getDoubleFromObjectOrReply(c, c->argv[2], &sleep_after_fork_seconds, NULL) != C_OK) {
return;
}
server.debug_sleep_after_fork_us = (int)(sleep_after_fork_seconds * 1e6);
} else if (!strcasecmp(c->argv[1]->ptr, "pause-after-fork") && c->argc == 3) {
server.debug_pause_after_fork = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "delay-rdb-client-free-seconds") && c->argc == 3) {
server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr);
Expand Down Expand Up @@ -2305,6 +2301,12 @@ void applyWatchdogPeriod(void) {
}
}

void debugPauseProcess(void) {
serverLog(LL_NOTICE, "Process is about to stop.");
raise(SIGSTOP);
serverLog(LL_NOTICE, "Process has been continued.");
}

/* Positive input is sleep time in microseconds. Negative input is fractions
* of microseconds, i.e. -10 means 100 nanoseconds. */
void debugDelay(int usec) {
Expand Down
18 changes: 8 additions & 10 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1896,15 +1896,13 @@ int freeClientsInAsyncFreeQueue(void) {
c->rdb_client_disconnect_time = server.unixtime;
serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id,
replicationGetReplicaName(c), server.wait_before_rdb_client_free);
continue;
}
if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) {
serverLog(LL_NOTICE,
"Replica main channel failed to establish PSYNC within the grace period (%ld seconds). "
"Freeing RDB client %llu.",
(long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
c->flag.protected_rdb_channel = 0;
}
if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue;
serverLog(LL_NOTICE,
"Replica main channel failed to establish PSYNC within the grace period (%ld seconds). "
"Freeing RDB client %llu.",
(long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
c->flag.protected_rdb_channel = 0;
}

if (c->flag.protected) continue;
Expand Down Expand Up @@ -4332,9 +4330,9 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) {
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(), c);
/* Remove RDB connection protection on COB overrun */
c->flag.protected_rdb_channel = 0;

if (async) {
if (async || c->flag.protected_rdb_channel) {
c->flag.protected_rdb_channel = 0;
freeClientAsync(c);
serverLog(LL_WARNING, "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.",
client);
Expand Down
17 changes: 11 additions & 6 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ int startBgsaveForReplication(int mincapa, int req) {
/* Keep the page cache since it'll get used soon */
retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE);
}
if (server.debug_sleep_after_fork_us) usleep(server.debug_sleep_after_fork_us);
if (server.debug_pause_after_fork) debugPauseProcess();
} else {
serverLog(LL_WARNING, "BGSAVE for replication: replication information not available, can't generate the RDB "
"file right now. Try later.");
Expand Down Expand Up @@ -2654,7 +2654,7 @@ static void fullSyncWithPrimary(connection *conn) {
}
/* Receive end offset response */
if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_ENDOFF) {
int64_t rdb_client_id;
uint64_t rdb_client_id;
err = receiveSynchronousResponse(conn);
if (err == NULL) goto error;
if (err[0] == '\0') {
Expand All @@ -2667,7 +2667,7 @@ static void fullSyncWithPrimary(connection *conn) {
char primary_replid[CONFIG_RUN_ID_SIZE + 1];
int dbid;
/* Parse end offset response */
char *endoff_format = "$ENDOFF:%lld %40s %d %ld";
char *endoff_format = "$ENDOFF:%lld %40s %d %llu";
if (sscanf(err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) {
goto error;
}
Expand Down Expand Up @@ -2859,9 +2859,14 @@ void dualChannelSyncSuccess(void) {
server.primary_initial_offset = server.repl_provisional_primary.reploff;
replicationResurrectProvisionalPrimary();
/* Wait for the accumulated buffer to be processed before reading any more replication updates */
if (streamReplDataBufToDb(server.primary) == C_ERR) {
if (server.pending_repl_data.blocks && streamReplDataBufToDb(server.primary) == C_ERR) {
/* Sync session aborted during repl data streaming. */
serverLog(LL_WARNING, "Failed to stream local replication buffer into memory");
/* Verify sync is still in progress */
if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) {
replicationAbortDualChannelSyncTransfer();
replicationUnsetPrimary();
}
return;
}
freePendingReplDataBuf();
Expand Down Expand Up @@ -3160,7 +3165,7 @@ void setupMainConnForPsync(connection *conn) {
char *err = NULL;
if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) {
/* We already have an initialized connection at primary side, we only need to associate it with RDB connection */
ll2string(llstr, sizeof(llstr), server.rdb_client_id);
ull2string(llstr, sizeof(llstr), server.rdb_client_id);
err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL);
if (err) goto error;
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
Expand All @@ -3181,7 +3186,7 @@ void setupMainConnForPsync(connection *conn) {
}

if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (server.debug_sleep_after_fork_us) usleep(server.debug_sleep_after_fork_us);
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Write error.");
cancelReplicationHandshake(1);
Expand Down
5 changes: 3 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2010,8 +2010,8 @@ struct valkeyServer {
* use dual channel replication for full syncs. */
int wait_before_rdb_client_free; /* Grace period in seconds for replica main channel
* to establish psync. */
int debug_sleep_after_fork_us; /* Debug param that force the main process to
* sleep for N microseconds after fork() in repl. */
int debug_pause_after_fork; /* Debug param that pauses the main process
* after a replication fork() (for bgsave). */
size_t repl_buffer_mem; /* The memory of replication buffer. */
list *repl_buffer_blocks; /* Replication buffers blocks list
* (serving replica clients and repl backlog) */
Expand Down Expand Up @@ -3986,6 +3986,7 @@ void killThreads(void);
void makeThreadKillable(void);
void swapMainDbWithTempDb(serverDb *tempDb);
sds getVersion(void);
void debugPauseProcess(void);

/* Use macro for checking log level to avoid evaluating arguments in cases log
* should be ignored due to low level. */
Expand Down
10 changes: 0 additions & 10 deletions tests/helpers/bg_server_sleep.tcl

This file was deleted.

Loading

0 comments on commit 48ca2c9

Please sign in to comment.