diff --git a/src/replication.c b/src/replication.c index 69ec0adbb6..55ddd73efd 100644 --- a/src/replication.c +++ b/src/replication.c @@ -55,7 +55,6 @@ void setupMainConnForPsync(connection *conn); void completeTaskRDBChannelSync(connection *conn); int isReplicaMainChannel(client *c); int isReplicaRdbChannel(client *c); -int isOngoingRdbChannelSync(void); void replicationAbortSyncTransfer(void); /* We take a global flag to remember if this instance generated an RDB @@ -1226,7 +1225,7 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_EOF; else if (!strcasecmp(c->argv[j+1]->ptr,"psync2")) c->slave_capa |= SLAVE_CAPA_PSYNC2; - } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { + } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { /* REPLCONF ACK is used by slave to inform the master the amount * of replication stream that it processed so far. It is an * internal only command that normal clients should never use. */ @@ -1798,18 +1797,8 @@ void shiftReplicationId(void) { /* Returns 1 if the given replication state is a handshake state, * 0 otherwise. */ int slaveIsInHandshakeState(void) { - return (server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY && - server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY) || - (server.repl_state >= REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY && - server.repl_state <= REPL_RDB_CONN_RECEIVE_ENDOFF); -} - -/* Returns 1 if the given replication state is a rdb transfer state, - * 0 otherwise. */ -int slaveIsInTransferState(void) { - return server.repl_state == REPL_STATE_TRANSFER || - (server.repl_state >= REPL_RDB_CONN_SEND_PSYNC && - server.repl_state <= REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE); + return server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY && + server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY; } /* Avoid the master to detect the slave is timing out while loading the @@ -1961,12 +1950,6 @@ int isReplicaRdbChannel(client *c) { return (c->flags & CLIENT_REPL_RDB_CHANNEL) != 0; } -/* Used on replica side to check if rdb-channel sync is currently in progress */ -int isOngoingRdbChannelSync(void) { - return server.repl_state >= REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY && - server.repl_state <= REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE; -} - /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(connection *conn) { @@ -2369,7 +2352,7 @@ void readSyncBulkPayload(connection *conn) { } else { replicationCreateMasterClient(server.repl_transfer_s, rsi.repl_stream_db); server.repl_state = REPL_STATE_CONNECTED; -/* Send the initial ACK immediately to put this replica in online state. */ + /* Send the initial ACK immediately to put this replica in online state. */ replicationSendAck(); } server.repl_down_since = 0; @@ -2548,7 +2531,7 @@ int reInitReplicaMainConnection(connection *conn) { serverAssert(conn == server.repl_transfer_s); /* Init the main connection for psync */ conn->state = CONN_STATE_CONNECTED; - server.repl_state = REPL_RDB_CONN_SEND_PSYNC; + server.repl_state = REPL_STATE_SEND_PSYNC; serverAssert(connSetReadHandler(server.repl_provisional_master.conn, setupMainConnForPsync) != C_ERR); setupMainConnForPsync(conn); return C_OK; @@ -2602,7 +2585,7 @@ void fullSyncWithMaster(connection* conn) { } /* Send replica capabilities */ - if (server.repl_state == REPL_RDB_CONN_SEND_CAPA) { + if (server.repl_rdb_conn_state == REPL_RDB_CONN_SEND_CAPA) { serverLog(LL_DEBUG, "Received first reply from primary using rdb connection. Sending capa"); /* Send replica lisening port to master for clarification */ @@ -2611,7 +2594,7 @@ void fullSyncWithMaster(connection* conn) { "rdb-only", "1", "rdb-conn", "1", "listening-port", portstr, NULL); sdsfree(portstr); if (err) goto write_error; - server.repl_state = REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY; + server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY; if (connSetReadHandler(conn, fullSyncWithMaster) == C_ERR) { char conninfo[CONN_INFO_LEN]; @@ -2623,7 +2606,7 @@ void fullSyncWithMaster(connection* conn) { return; } /* Receive replconf response */ - if (server.repl_state == REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY) { + if (server.repl_rdb_conn_state == REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; @@ -2640,11 +2623,11 @@ void fullSyncWithMaster(connection* conn) { goto error; } - server.repl_state = REPL_RDB_CONN_RECEIVE_ENDOFF; + server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_ENDOFF; return; } /* Receive master rdb-channel end offset response */ - if (server.repl_state == REPL_RDB_CONN_RECEIVE_ENDOFF) { + if (server.repl_rdb_conn_state == REPL_RDB_CONN_RECEIVE_ENDOFF) { char buf[PROTO_IOBUF_LEN]; connSyncReadLine(conn, buf, 1024, server.repl_syncio_timeout * 1000); if (buf[0] == '\0') { @@ -2827,9 +2810,9 @@ void streamReplDataBufToDb(client *c) { void completeTaskRDBChannelSync(connection *conn) { if (conn == server.repl_transfer_s) { /* Main connection */ - if (server.repl_state == REPL_RDB_CONN_RECEIVE_PSYNC_REPLY && server.repl_rdb_transfer_s != NULL) { + if (server.repl_state == REPL_STATE_RECEIVE_PSYNC_REPLY && server.repl_rdb_conn_state < REPL_RDB_CONN_RDB_LOADED) { /* RDB is still loading */ - server.repl_state = REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE; + server.repl_state = REPL_STATE_TRANSFER; if (connSetReadHandler(server.repl_provisional_master.conn, bufferReplData)) { serverLog(LL_WARNING,"Error while setting readable handler: %s", strerror(errno)); abortRdbConnectionSync(); @@ -2837,7 +2820,7 @@ void completeTaskRDBChannelSync(connection *conn) { replDataBufInit(); return; } - if (server.repl_state == REPL_RDB_CONN_RECEIVE_PSYNC_REPLY && server.repl_rdb_transfer_s == NULL) { + if (server.repl_state == REPL_STATE_RECEIVE_PSYNC_REPLY && server.repl_rdb_conn_state == REPL_RDB_CONN_RDB_LOADED) { /* RDB is loaded */ serverLog(LL_DEBUG, "RDB channel sync - psync established after rdb load"); goto sync_success; @@ -2846,11 +2829,13 @@ void completeTaskRDBChannelSync(connection *conn) { } if (conn == server.repl_rdb_transfer_s) { /* RDB connection */ - if (server.repl_state == REPL_RDB_CONN_RECEIVE_PSYNC_REPLY) { - /* Main psync connection hasn't been established yet, exit without changing the state */ + if (server.repl_state < REPL_STATE_TRANSFER) { + /* Main psync connection hasn't been established yet */ + server.repl_rdb_conn_state = REPL_RDB_CONN_RDB_LOADED; return; } - if (server.repl_state == REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE) { + if (server.repl_state == REPL_STATE_TRANSFER) { + server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; connSetReadHandler(server.repl_transfer_s, NULL); goto sync_success; } @@ -2939,7 +2924,7 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) { server.master_initial_offset = -1; - if (isOngoingRdbChannelSync()) { + if (server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE) { // rdb-channel sync is in progress /* While in rdb-channel-sync, we should use our prepared repl id and offset. */ psync_replid = server.repl_provisional_master.replid; snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_master.reploff+1); @@ -3022,7 +3007,7 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) { } if (!strncmp(reply,"+CONTINUE",9)) { - if (isOngoingRdbChannelSync()) { + if (server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE) { /* During rdb-sync sesseion, master struct is already initilized. */ return PSYNC_CONTINUE; } @@ -3113,12 +3098,12 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) { void setupMainConnForPsync(connection *conn) { int psync_result; - if (server.repl_state == REPL_RDB_CONN_SEND_PSYNC) { + if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { serverLog(LL_WARNING, "Aborting RDB connection sync. Write error."); abortRdbConnectionSync(); } - server.repl_state = REPL_RDB_CONN_RECEIVE_PSYNC_REPLY; + server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; return; } psync_result = slaveTryPartialResynchronization(conn,1); @@ -3419,12 +3404,6 @@ void syncWithMaster(connection *conn) { } } - /* Using rdb-channel sync, the master responded -FULLSYNCNEEDED. We need to - * initialize the RDB channel. */ - if (psync_result == PSYNC_FULLRESYNC_RDB_CONN) { - server.repl_state = REPL_RDB_CONN_SEND_CAPA; - } - /* Prepare a suitable temp file for bulk transfer */ if (!useDisklessLoad()) { while(maxtries--) { @@ -3442,7 +3421,9 @@ void syncWithMaster(connection *conn) { server.repl_transfer_fd = dfd; } - if (server.master_supports_rdb_channel > 0) { + /* Using rdb-channel sync, the master responded -FULLSYNCNEEDED. We need to + * initialize the RDB channel. */ + if (psync_result == PSYNC_FULLRESYNC_RDB_CONN) { /* Create a full sync connection */ server.repl_rdb_transfer_s = connCreate(connTypeOfReplication()); if (connConnect(server.repl_rdb_transfer_s, server.masterhost, server.masterport, @@ -3459,7 +3440,8 @@ void syncWithMaster(connection *conn) { "Can't clear main connection handler: %s (%s)", strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; - } + } + server.repl_rdb_conn_state = REPL_RDB_CONN_SEND_CAPA; return; } /* Setup the non blocking download of the bulk file. */ @@ -3547,7 +3529,7 @@ void undoConnectWithMaster(void) { * Never call this function directly, use cancelReplicationHandshake() instead. */ void replicationAbortSyncTransfer(void) { - serverAssert(slaveIsInTransferState()); + serverAssert(server.repl_state == REPL_STATE_TRANSFER); undoConnectWithMaster(); if (server.repl_transfer_fd!=-1) { close(server.repl_transfer_fd); @@ -3567,7 +3549,7 @@ void replicationAbortSyncTransfer(void) { * * Otherwise zero is returned and no operation is performed at all. */ int cancelReplicationHandshake(int reconnect) { - if (slaveIsInTransferState()) { + if (server.repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(); server.repl_state = REPL_STATE_CONNECT; } else if (server.repl_state == REPL_STATE_CONNECTING || @@ -3839,7 +3821,6 @@ void roleCommand(client *c) { case REPL_STATE_NONE: slavestate = "none"; break; case REPL_STATE_CONNECT: slavestate = "connect"; break; case REPL_STATE_CONNECTING: slavestate = "connecting"; break; - case REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE: slavestate = "rdb-channel-sync"; break; case REPL_STATE_TRANSFER: slavestate = "sync"; break; case REPL_STATE_CONNECTED: slavestate = "connected"; break; default: slavestate = "unknown"; break; diff --git a/src/server.c b/src/server.c index 848526a707..b18cb842ec 100644 --- a/src/server.c +++ b/src/server.c @@ -2053,6 +2053,7 @@ void initServerConfig(void) { server.cached_master = NULL; server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; + server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; server.repl_transfer_tmpfile = NULL; server.repl_transfer_fd = -1; server.repl_transfer_s = NULL; diff --git a/src/server.h b/src/server.h index d0a62c3894..4908ca9d0f 100644 --- a/src/server.h +++ b/src/server.h @@ -453,16 +453,16 @@ typedef enum { REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ - /* --- RDB channel related states --- */ + REPL_STATE_CONNECTED, /* Connected to master */ +} repl_state; + +typedef enum { + REPL_RDB_CONN_STATE_NONE = 0, /* No active replication */ REPL_RDB_CONN_SEND_CAPA, /* Send replica cob-channel capabilities */ REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY, /* Wait for REPLCONF reply */ REPL_RDB_CONN_RECEIVE_ENDOFF, /* Wait for $ENDOFF reply */ - REPL_RDB_CONN_SEND_PSYNC, /* Same as REPL_STATE_SEND_PSYNC but during RDB load */ - REPL_RDB_CONN_RECEIVE_PSYNC_REPLY, /* Same as REPL_STATE_RECEIVE_PSYNC_REPLY but during RDB load */ - REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE, /* Replica's main connection is partial syncing while it's RDB connection loading the RDB */ - /* --- Replication steady state ---*/ - REPL_STATE_CONNECTED, /* Connected to master */ -} repl_state; + REPL_RDB_CONN_RDB_LOADED, +} repl_rdb_conn_state; /* The state of an in progress coordinated failover */ typedef enum { @@ -1910,6 +1910,7 @@ struct redisServer { client *cached_master; /* Cached master to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* Replication status if the instance is a slave */ + int repl_rdb_conn_state; /* State of the replica's rdb channel during rdb-channel sync */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */