Skip to content

Commit

Permalink
Rename repl_data_buf to pending_repl_data
Browse files Browse the repository at this point in the history
Fix memory leak
Added void param to isOngoingRdbChannelSync
  • Loading branch information
naglera committed May 4, 2023
1 parent 9d464e2 commit d976a81
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/adlist.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void listUnlinkNode(list *list, listNode *node) {
/*
* Remove the node and return the next one in the list
*/
listNode *listDelNodeAndNext(list* list, listNode* node) {
listNode *listRemoveAndGetNext(list* list, listNode* node) {
if (node == NULL) {
return listFirst(list);
}
Expand Down
2 changes: 1 addition & 1 deletion src/adlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ list *listAddNodeHead(list *list, void *value);
list *listAddNodeTail(list *list, void *value);
list *listInsertNode(list *list, listNode *old_node, void *value, int after);
void listDelNode(list *list, listNode *node);
listNode *listDelNodeAndNext(list* list, listNode* node);
listNode *listRemoveAndGetNext(list* list, listNode* node);
listIter *listGetIterator(list *list, int direction);
listNode *listNext(listIter *iter);
void listReleaseIterator(listIter *iter);
Expand Down
42 changes: 23 additions & 19 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2494,11 +2494,11 @@ sds getReplicaPortString() {
}

void freeReplDataBuf() {
if (server.repl_data_buf.blocks) {
listRelease(server.repl_data_buf.blocks);
if (server.pending_repl_data.blocks) {
listRelease(server.pending_repl_data.blocks);
}
server.repl_data_buf.blocks = NULL;
server.repl_data_buf.len = 0;
server.pending_repl_data.blocks = NULL;
server.pending_repl_data.len = 0;
}

void abortRdbConnectionSync(int should_retry) {
Expand Down Expand Up @@ -2669,12 +2669,12 @@ void fullSyncWithMaster(connection* conn) {
}


/* Initialize server.repl_data_buf infrastructure, we will allocate the buffer itself once we need it */
/* Initialize server.pending_repl_data infrastructure, we will allocate the buffer itself once we need it */
void replDataBufInit() {
serverAssert(server.repl_data_buf.blocks == NULL);
server.repl_data_buf.len = 0;
server.repl_data_buf.blocks = listCreate();
server.repl_data_buf.blocks->free = zfree;
serverAssert(server.pending_repl_data.blocks == NULL);
server.pending_repl_data.len = 0;
server.pending_repl_data.blocks = listCreate();
server.pending_repl_data.blocks->free = zfree;
}

/* Track replication data streaming progress, and serve clients from time to time */
Expand All @@ -2696,7 +2696,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) {
serverLog(LL_VERBOSE, "Error reading from primary: %s",connGetLastError(conn));
abortRdbConnectionSync(1);
}
return -1;
return C_ERR;
}
if (nread == 0) {
if (server.verbosity <= LL_VERBOSE) {
Expand All @@ -2705,35 +2705,35 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) {
sdsfree(info);
}
abortRdbConnectionSync(1);
return -1;
return C_ERR;
}
o->used += nread;
incrReadsProcessed(nread);
return read - nread;
}

int isReplicaBufferLimitReached() {
return server.repl_data_buf.len > server.client_obuf_limits[1].hard_limit_bytes;
return server.pending_repl_data.len > server.client_obuf_limits[1].hard_limit_bytes;
}

/* This connection handler buffers incoming steady-state replication data while downloading and loading the
* RDB. As soon as the RDB is loaded, the data will be streamed into the database in streamReplDataBufToDb(). */
/* Read handler for buffering incoming repl data during RDB download/loading. */
void bufferReplData(connection *conn) {
client *c = connGetPrivateData(conn);
size_t readlen = PROTO_IOBUF_LEN;
int read = 0;

while (readlen > 0) {
listNode *ln = listLast(server.repl_data_buf.blocks);
listNode *ln = listLast(server.pending_repl_data.blocks);
replDataBufBlock *tail = ln ? listNodeValue(ln) : NULL;

/* Append to tail string when possible */
if (tail && tail->used < tail->size) {
size_t avail = tail->size - tail->used;
read = min(readlen, avail);
readlen -= read;
read = readIntoReplDataBlock(conn, tail, read);
}
if (readlen && read <= 0) {
if (readlen && read == 0) {
if (isReplicaBufferLimitReached()) {
serverLog(LL_DEBUG, "Replication buffer limit reached, stopping buffering");
break;
Expand All @@ -2742,16 +2742,20 @@ void bufferReplData(connection *conn) {
tail = zmalloc(sizeof(replDataBufBlock));
tail->size = PRIMARY_REPL_BUF_BLOCK_SIZE;
tail->used = 0;
listAddNodeTail(server.repl_data_buf.blocks, tail);
server.repl_data_buf.len += tail->size;
listAddNodeTail(server.pending_repl_data.blocks, tail);
server.pending_repl_data.len += tail->size;

read = min(readlen, tail->size);
readlen -= read;
read = readIntoReplDataBlock(conn, tail, read);
}
if (read > 0) {
/* Stop reading in case we read less than we anticipated */
break;
}
if (read == C_ERR) {
return;
}
}
c->lastinteraction = server.unixtime;
}
Expand All @@ -2763,7 +2767,7 @@ void streamReplDataBufToDb(client *c) {
size_t offset = 0;
listNode *cur = NULL;

while ((cur = listDelNodeAndNext(server.repl_data_buf.blocks, cur))) {
while ((cur = listRemoveAndGetNext(server.pending_repl_data.blocks, cur))) {
/* Read and process repl data block */
replDataBufBlock *o = listNodeValue(cur);
c->querybuf = sdscatlen(c->querybuf, o->buf, o->used);
Expand Down
4 changes: 2 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ int isReplicaRdbChannel(client *c) {
}

/* Used on replica side to check if rdb-channel sync is currently in progress */
int isOngoingRdbChannelSync() {
int isOngoingRdbChannelSync(void) {
return server.repl_state >= REPL_SEC_CONN_RECEIVE_REPLCONF_REPLY &&
server.repl_state <= REPL_SEC_CONN_TWO_CONNECTIONS_ACTIVE;
}
Expand Down Expand Up @@ -5697,7 +5697,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
freeMemoryGetNotCountedMemory(),
mh->repl_backlog,
server.repl_buffer_mem,
server.repl_data_buf.len,
server.pending_repl_data.len,
mh->clients_slaves,
mh->clients_normal,
mh->cluster_links,
Expand Down
6 changes: 2 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1872,7 +1872,7 @@ struct redisServer {
struct { /* Replication data buffer for rdb-channel sync */
list *blocks; /* list of replDataBufBlock */
size_t len;
} repl_data_buf;
} pending_repl_data;
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
Expand Down Expand Up @@ -2816,8 +2816,6 @@ void replicationStartPendingFork(void);
void replicationHandleMasterDisconnection(void);
void replicationCacheMaster(client *c);
void resizeReplicationBacklog(void);
void setReplicaBufferLimit();
void resizeReplicationBuffer(long long new_size);
void replicationSetMaster(char *ip, int port);
void replicationUnsetMaster(void);
void refreshGoodSlavesCount(void);
Expand Down Expand Up @@ -2849,7 +2847,7 @@ void abortFailover(const char *err);
const char *getFailoverStateString(void);
int isReplicaPsyncChannel(client *c);
int isReplicaRdbChannel(client *c);
int isOngoingRdbChannelSync();
int isOngoingRdbChannelSync(void);
void abortRdbConnectionSync(int should_retry);
void incrReadsProcessed(size_t nread);

Expand Down

0 comments on commit d976a81

Please sign in to comment.