Skip to content

Commit

Permalink
Simplfy replica state machine by spliting replica state
Browse files Browse the repository at this point in the history
server.repl_state will be used to represent the main conneciton state.
server.repl_rdb_conn_state is introduced to represent the rdb-channel
state.

Main channel can reuse REPL_STATE_SEND_PSYNC,
REPL_STATE_RECEIVE_PSYNC_REPLY, REPL_STATE_TRANSFER after rdb-channel
initialization.
  • Loading branch information
naglera committed Feb 7, 2024
1 parent ed6d69a commit d25c36e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 55 deletions.
77 changes: 29 additions & 48 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ void setupMainConnForPsync(connection *conn);
void completeTaskRDBChannelSync(connection *conn);
int isReplicaMainChannel(client *c);
int isReplicaRdbChannel(client *c);
int isOngoingRdbChannelSync(void);
void replicationAbortSyncTransfer(void);

/* We take a global flag to remember if this instance generated an RDB
Expand Down Expand Up @@ -1226,7 +1225,7 @@ void replconfCommand(client *c) {
c->slave_capa |= SLAVE_CAPA_EOF;
else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
c->slave_capa |= SLAVE_CAPA_PSYNC2;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
* internal only command that normal clients should never use. */
Expand Down Expand Up @@ -1798,18 +1797,8 @@ void shiftReplicationId(void) {
/* Returns 1 if the given replication state is a handshake state,
* 0 otherwise. */
int slaveIsInHandshakeState(void) {
return (server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY &&
server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY) ||
(server.repl_state >= REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY &&
server.repl_state <= REPL_RDB_CONN_RECEIVE_ENDOFF);
}

/* Returns 1 if the given replication state is a rdb transfer state,
* 0 otherwise. */
int slaveIsInTransferState(void) {
return server.repl_state == REPL_STATE_TRANSFER ||
(server.repl_state >= REPL_RDB_CONN_SEND_PSYNC &&
server.repl_state <= REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE);
return server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY &&
server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY;
}

/* Avoid the master to detect the slave is timing out while loading the
Expand Down Expand Up @@ -1961,12 +1950,6 @@ int isReplicaRdbChannel(client *c) {
return (c->flags & CLIENT_REPL_RDB_CHANNEL) != 0;
}

/* Used on replica side to check if rdb-channel sync is currently in progress */
int isOngoingRdbChannelSync(void) {
return server.repl_state >= REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY &&
server.repl_state <= REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE;
}

/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(connection *conn) {
Expand Down Expand Up @@ -2369,7 +2352,7 @@ void readSyncBulkPayload(connection *conn) {
} else {
replicationCreateMasterClient(server.repl_transfer_s, rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
/* Send the initial ACK immediately to put this replica in online state. */
/* Send the initial ACK immediately to put this replica in online state. */
replicationSendAck();
}
server.repl_down_since = 0;
Expand Down Expand Up @@ -2548,7 +2531,7 @@ int reInitReplicaMainConnection(connection *conn) {
serverAssert(conn == server.repl_transfer_s);
/* Init the main connection for psync */
conn->state = CONN_STATE_CONNECTED;
server.repl_state = REPL_RDB_CONN_SEND_PSYNC;
server.repl_state = REPL_STATE_SEND_PSYNC;
serverAssert(connSetReadHandler(server.repl_provisional_master.conn, setupMainConnForPsync) != C_ERR);
setupMainConnForPsync(conn);
return C_OK;
Expand Down Expand Up @@ -2602,7 +2585,7 @@ void fullSyncWithMaster(connection* conn) {
}

/* Send replica capabilities */
if (server.repl_state == REPL_RDB_CONN_SEND_CAPA) {
if (server.repl_rdb_conn_state == REPL_RDB_CONN_SEND_CAPA) {
serverLog(LL_DEBUG, "Received first reply from primary using rdb connection. Sending capa");

/* Send replica lisening port to master for clarification */
Expand All @@ -2611,7 +2594,7 @@ void fullSyncWithMaster(connection* conn) {
"rdb-only", "1", "rdb-conn", "1", "listening-port", portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
server.repl_state = REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY;
server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY;

if (connSetReadHandler(conn, fullSyncWithMaster) == C_ERR) {
char conninfo[CONN_INFO_LEN];
Expand All @@ -2623,7 +2606,7 @@ void fullSyncWithMaster(connection* conn) {
return;
}
/* Receive replconf response */
if (server.repl_state == REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY) {
if (server.repl_rdb_conn_state == REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;

Expand All @@ -2640,11 +2623,11 @@ void fullSyncWithMaster(connection* conn) {
goto error;
}

server.repl_state = REPL_RDB_CONN_RECEIVE_ENDOFF;
server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_ENDOFF;
return;
}
/* Receive master rdb-channel end offset response */
if (server.repl_state == REPL_RDB_CONN_RECEIVE_ENDOFF) {
if (server.repl_rdb_conn_state == REPL_RDB_CONN_RECEIVE_ENDOFF) {
char buf[PROTO_IOBUF_LEN];
connSyncReadLine(conn, buf, 1024, server.repl_syncio_timeout * 1000);
if (buf[0] == '\0') {
Expand Down Expand Up @@ -2827,17 +2810,17 @@ void streamReplDataBufToDb(client *c) {
void completeTaskRDBChannelSync(connection *conn) {
if (conn == server.repl_transfer_s) {
/* Main connection */
if (server.repl_state == REPL_RDB_CONN_RECEIVE_PSYNC_REPLY && server.repl_rdb_transfer_s != NULL) {
if (server.repl_state == REPL_STATE_RECEIVE_PSYNC_REPLY && server.repl_rdb_conn_state < REPL_RDB_CONN_RDB_LOADED) {
/* RDB is still loading */
server.repl_state = REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE;
server.repl_state = REPL_STATE_TRANSFER;
if (connSetReadHandler(server.repl_provisional_master.conn, bufferReplData)) {
serverLog(LL_WARNING,"Error while setting readable handler: %s", strerror(errno));
abortRdbConnectionSync();
}
replDataBufInit();
return;
}
if (server.repl_state == REPL_RDB_CONN_RECEIVE_PSYNC_REPLY && server.repl_rdb_transfer_s == NULL) {
if (server.repl_state == REPL_STATE_RECEIVE_PSYNC_REPLY && server.repl_rdb_conn_state == REPL_RDB_CONN_RDB_LOADED) {
/* RDB is loaded */
serverLog(LL_DEBUG, "RDB channel sync - psync established after rdb load");
goto sync_success;
Expand All @@ -2846,11 +2829,13 @@ void completeTaskRDBChannelSync(connection *conn) {
}
if (conn == server.repl_rdb_transfer_s) {
/* RDB connection */
if (server.repl_state == REPL_RDB_CONN_RECEIVE_PSYNC_REPLY) {
/* Main psync connection hasn't been established yet, exit without changing the state */
if (server.repl_state < REPL_STATE_TRANSFER) {
/* Main psync connection hasn't been established yet */
server.repl_rdb_conn_state = REPL_RDB_CONN_RDB_LOADED;
return;
}
if (server.repl_state == REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE) {
if (server.repl_state == REPL_STATE_TRANSFER) {
server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE;
connSetReadHandler(server.repl_transfer_s, NULL);
goto sync_success;
}
Expand Down Expand Up @@ -2939,7 +2924,7 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
server.master_initial_offset = -1;


if (isOngoingRdbChannelSync()) {
if (server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE) { // rdb-channel sync is in progress
/* While in rdb-channel-sync, we should use our prepared repl id and offset. */
psync_replid = server.repl_provisional_master.replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_master.reploff+1);
Expand Down Expand Up @@ -3022,7 +3007,7 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
}

if (!strncmp(reply,"+CONTINUE",9)) {
if (isOngoingRdbChannelSync()) {
if (server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE) {
/* During rdb-sync sesseion, master struct is already initilized. */
return PSYNC_CONTINUE;
}
Expand Down Expand Up @@ -3113,12 +3098,12 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {

void setupMainConnForPsync(connection *conn) {
int psync_result;
if (server.repl_state == REPL_RDB_CONN_SEND_PSYNC) {
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
serverLog(LL_WARNING, "Aborting RDB connection sync. Write error.");
abortRdbConnectionSync();
}
server.repl_state = REPL_RDB_CONN_RECEIVE_PSYNC_REPLY;
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
return;
}
psync_result = slaveTryPartialResynchronization(conn,1);
Expand Down Expand Up @@ -3419,12 +3404,6 @@ void syncWithMaster(connection *conn) {
}
}

/* Using rdb-channel sync, the master responded -FULLSYNCNEEDED. We need to
* initialize the RDB channel. */
if (psync_result == PSYNC_FULLRESYNC_RDB_CONN) {
server.repl_state = REPL_RDB_CONN_SEND_CAPA;
}

/* Prepare a suitable temp file for bulk transfer */
if (!useDisklessLoad()) {
while(maxtries--) {
Expand All @@ -3442,7 +3421,9 @@ void syncWithMaster(connection *conn) {
server.repl_transfer_fd = dfd;
}

if (server.master_supports_rdb_channel > 0) {
/* Using rdb-channel sync, the master responded -FULLSYNCNEEDED. We need to
* initialize the RDB channel. */
if (psync_result == PSYNC_FULLRESYNC_RDB_CONN) {
/* Create a full sync connection */
server.repl_rdb_transfer_s = connCreate(connTypeOfReplication());
if (connConnect(server.repl_rdb_transfer_s, server.masterhost, server.masterport,
Expand All @@ -3459,7 +3440,8 @@ void syncWithMaster(connection *conn) {
"Can't clear main connection handler: %s (%s)",
strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}
}
server.repl_rdb_conn_state = REPL_RDB_CONN_SEND_CAPA;
return;
}
/* Setup the non blocking download of the bulk file. */
Expand Down Expand Up @@ -3547,7 +3529,7 @@ void undoConnectWithMaster(void) {
* Never call this function directly, use cancelReplicationHandshake() instead.
*/
void replicationAbortSyncTransfer(void) {
serverAssert(slaveIsInTransferState());
serverAssert(server.repl_state == REPL_STATE_TRANSFER);
undoConnectWithMaster();
if (server.repl_transfer_fd!=-1) {
close(server.repl_transfer_fd);
Expand All @@ -3567,7 +3549,7 @@ void replicationAbortSyncTransfer(void) {
*
* Otherwise zero is returned and no operation is performed at all. */
int cancelReplicationHandshake(int reconnect) {
if (slaveIsInTransferState()) {
if (server.repl_state == REPL_STATE_TRANSFER) {
replicationAbortSyncTransfer();
server.repl_state = REPL_STATE_CONNECT;
} else if (server.repl_state == REPL_STATE_CONNECTING ||
Expand Down Expand Up @@ -3839,7 +3821,6 @@ void roleCommand(client *c) {
case REPL_STATE_NONE: slavestate = "none"; break;
case REPL_STATE_CONNECT: slavestate = "connect"; break;
case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
case REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE: slavestate = "rdb-channel-sync"; break;
case REPL_STATE_TRANSFER: slavestate = "sync"; break;
case REPL_STATE_CONNECTED: slavestate = "connected"; break;
default: slavestate = "unknown"; break;
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,7 @@ void initServerConfig(void) {
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE;
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_transfer_s = NULL;
Expand Down
15 changes: 8 additions & 7 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,16 @@ typedef enum {
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
/* --- RDB channel related states --- */
REPL_STATE_CONNECTED, /* Connected to master */
} repl_state;

typedef enum {
REPL_RDB_CONN_STATE_NONE = 0, /* No active replication */
REPL_RDB_CONN_SEND_CAPA, /* Send replica cob-channel capabilities */
REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY, /* Wait for REPLCONF reply */
REPL_RDB_CONN_RECEIVE_ENDOFF, /* Wait for $ENDOFF reply */
REPL_RDB_CONN_SEND_PSYNC, /* Same as REPL_STATE_SEND_PSYNC but during RDB load */
REPL_RDB_CONN_RECEIVE_PSYNC_REPLY, /* Same as REPL_STATE_RECEIVE_PSYNC_REPLY but during RDB load */
REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE, /* Replica's main connection is partial syncing while it's RDB connection loading the RDB */
/* --- Replication steady state ---*/
REPL_STATE_CONNECTED, /* Connected to master */
} repl_state;
REPL_RDB_CONN_RDB_LOADED,
} repl_rdb_conn_state;

/* The state of an in progress coordinated failover */
typedef enum {
Expand Down Expand Up @@ -1910,6 +1910,7 @@ struct redisServer {
client *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a slave */
int repl_rdb_conn_state; /* State of the replica's rdb channel during rdb-channel sync */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
Expand Down

0 comments on commit d25c36e

Please sign in to comment.