Skip to content

Commit

Permalink
Move sendCurentOffsetToReplica to replication.c
Browse files Browse the repository at this point in the history
  • Loading branch information
naglera committed Jan 4, 2024
1 parent 03d35b6 commit be1f66f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
15 changes: 0 additions & 15 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
int sendCurentOffsetToReplica(client* replica);

#ifdef __GNUC__
void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__ ((format (printf, 3, 4)));
Expand Down Expand Up @@ -3500,20 +3499,6 @@ void killRDBChild(void) {
* - rdbRemoveTempFile */
}

/* Send to replica End Offset response with structure
* $ENDOFF:<end-offset> <primary-repl-id> <current-db-id> */
int sendCurentOffsetToReplica(client* replica) {
char buf[128];
int buflen;
buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d\r\n", server.master_repl_offset, server.replid, server.db->id);
serverLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld", replicationGetSlaveName(replica), server.master_repl_offset);
if (connSyncWrite(replica->conn, buf, buflen, server.repl_syncio_timeout*1000) != buflen) {
freeClientAsync(replica);
return C_ERR;
}
return C_OK;
}

/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
Expand Down
14 changes: 14 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2553,6 +2553,20 @@ int prepareRdbConnectionForRdbLoad(connection *conn) {
return C_OK;
}

/* Send to replica End Offset response with structure
* $ENDOFF:<end-offset> <primary-repl-id> <current-db-id> */
int sendCurentOffsetToReplica(client* replica) {
char buf[128];
int buflen;
buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d\r\n", server.master_repl_offset, server.replid, server.db->id);
serverLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld", replicationGetSlaveName(replica), server.master_repl_offset);
if (connSyncWrite(replica->conn, buf, buflen, server.repl_syncio_timeout*1000) != buflen) {
freeClientAsync(replica);
return C_ERR;
}
return C_OK;
}

int processEndOffsetResponse(char* response) {
long long reploffset;
char master_replid[CONFIG_RUN_ID_SIZE+1];
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2846,6 +2846,7 @@ int isReplicaRdbChannel(client *c);
int isOngoingRdbChannelSync(void);
void abortRdbConnectionSync(int should_retry);
void incrReadsProcessed(size_t nread);
int sendCurentOffsetToReplica(client* replica);

/* Generic persistence functions */
void startLoadingFile(size_t size, char* filename, int rdbflags);
Expand Down

0 comments on commit be1f66f

Please sign in to comment.