diff --git a/src/adlist.c b/src/adlist.c index 1ab48a8383..984018d291 100644 --- a/src/adlist.c +++ b/src/adlist.c @@ -208,7 +208,7 @@ void listUnlinkNode(list *list, listNode *node) { /* * Remove the node and return the next one in the list */ -listNode *listDelNodeAndNext(list* list, listNode* node) { +listNode *listRemoveAndGetNext(list* list, listNode* node) { if (node == NULL) { return listFirst(list); } diff --git a/src/adlist.h b/src/adlist.h index 7889a3d204..1b34bf2204 100644 --- a/src/adlist.h +++ b/src/adlist.h @@ -77,7 +77,7 @@ list *listAddNodeHead(list *list, void *value); list *listAddNodeTail(list *list, void *value); list *listInsertNode(list *list, listNode *old_node, void *value, int after); void listDelNode(list *list, listNode *node); -listNode *listDelNodeAndNext(list* list, listNode* node); +listNode *listRemoveAndGetNext(list* list, listNode* node); listIter *listGetIterator(list *list, int direction); listNode *listNext(listIter *iter); void listReleaseIterator(listIter *iter); diff --git a/src/replication.c b/src/replication.c index 4fee9c3143..91d3deac40 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2494,11 +2494,11 @@ sds getReplicaPortString() { } void freeReplDataBuf() { - if (server.repl_data_buf.blocks) { - listRelease(server.repl_data_buf.blocks); + if (server.pending_repl_data.blocks) { + listRelease(server.pending_repl_data.blocks); } - server.repl_data_buf.blocks = NULL; - server.repl_data_buf.len = 0; + server.pending_repl_data.blocks = NULL; + server.pending_repl_data.len = 0; } void abortRdbConnectionSync(int should_retry) { @@ -2669,12 +2669,12 @@ void fullSyncWithMaster(connection* conn) { } -/* Initialize server.repl_data_buf infrastructure, we will allocate the buffer itself once we need it */ +/* Initialize server.pending_repl_data infrastructure, we will allocate the buffer itself once we need it */ void replDataBufInit() { - serverAssert(server.repl_data_buf.blocks == NULL); - server.repl_data_buf.len = 0; - server.repl_data_buf.blocks = listCreate(); - server.repl_data_buf.blocks->free = zfree; + serverAssert(server.pending_repl_data.blocks == NULL); + server.pending_repl_data.len = 0; + server.pending_repl_data.blocks = listCreate(); + server.pending_repl_data.blocks->free = zfree; } /* Track replication data streaming progress, and serve clients from time to time */ @@ -2696,7 +2696,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) { serverLog(LL_VERBOSE, "Error reading from primary: %s",connGetLastError(conn)); abortRdbConnectionSync(1); } - return -1; + return C_ERR; } if (nread == 0) { if (server.verbosity <= LL_VERBOSE) { @@ -2705,7 +2705,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) { sdsfree(info); } abortRdbConnectionSync(1); - return -1; + return C_ERR; } o->used += nread; incrReadsProcessed(nread); @@ -2713,27 +2713,27 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) { } int isReplicaBufferLimitReached() { - return server.repl_data_buf.len > server.client_obuf_limits[1].hard_limit_bytes; + return server.pending_repl_data.len > server.client_obuf_limits[1].hard_limit_bytes; } -/* This connection handler buffers incoming steady-state replication data while downloading and loading the - * RDB. As soon as the RDB is loaded, the data will be streamed into the database in streamReplDataBufToDb(). */ +/* Read handler for buffering incoming repl data during RDB download/loading. */ void bufferReplData(connection *conn) { client *c = connGetPrivateData(conn); size_t readlen = PROTO_IOBUF_LEN; int read = 0; while (readlen > 0) { - listNode *ln = listLast(server.repl_data_buf.blocks); + listNode *ln = listLast(server.pending_repl_data.blocks); replDataBufBlock *tail = ln ? listNodeValue(ln) : NULL; /* Append to tail string when possible */ if (tail && tail->used < tail->size) { size_t avail = tail->size - tail->used; read = min(readlen, avail); + readlen -= read; read = readIntoReplDataBlock(conn, tail, read); } - if (readlen && read <= 0) { + if (readlen && read == 0) { if (isReplicaBufferLimitReached()) { serverLog(LL_DEBUG, "Replication buffer limit reached, stopping buffering"); break; @@ -2742,16 +2742,20 @@ void bufferReplData(connection *conn) { tail = zmalloc(sizeof(replDataBufBlock)); tail->size = PRIMARY_REPL_BUF_BLOCK_SIZE; tail->used = 0; - listAddNodeTail(server.repl_data_buf.blocks, tail); - server.repl_data_buf.len += tail->size; + listAddNodeTail(server.pending_repl_data.blocks, tail); + server.pending_repl_data.len += tail->size; read = min(readlen, tail->size); + readlen -= read; read = readIntoReplDataBlock(conn, tail, read); } if (read > 0) { /* Stop reading in case we read less than we anticipated */ break; } + if (read == C_ERR) { + return; + } } c->lastinteraction = server.unixtime; } @@ -2763,7 +2767,7 @@ void streamReplDataBufToDb(client *c) { size_t offset = 0; listNode *cur = NULL; - while ((cur = listDelNodeAndNext(server.repl_data_buf.blocks, cur))) { + while ((cur = listRemoveAndGetNext(server.pending_repl_data.blocks, cur))) { /* Read and process repl data block */ replDataBufBlock *o = listNodeValue(cur); c->querybuf = sdscatlen(c->querybuf, o->buf, o->used); diff --git a/src/server.c b/src/server.c index 461176877c..b53bbfa71c 100644 --- a/src/server.c +++ b/src/server.c @@ -703,7 +703,7 @@ int isReplicaRdbChannel(client *c) { } /* Used on replica side to check if rdb-channel sync is currently in progress */ -int isOngoingRdbChannelSync() { +int isOngoingRdbChannelSync(void) { return server.repl_state >= REPL_SEC_CONN_RECEIVE_REPLCONF_REPLY && server.repl_state <= REPL_SEC_CONN_TWO_CONNECTIONS_ACTIVE; } @@ -5697,7 +5697,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { freeMemoryGetNotCountedMemory(), mh->repl_backlog, server.repl_buffer_mem, - server.repl_data_buf.len, + server.pending_repl_data.len, mh->clients_slaves, mh->clients_normal, mh->cluster_links, diff --git a/src/server.h b/src/server.h index e23ef16dc0..9d76d07df7 100644 --- a/src/server.h +++ b/src/server.h @@ -1872,7 +1872,7 @@ struct redisServer { struct { /* Replication data buffer for rdb-channel sync */ list *blocks; /* list of replDataBufBlock */ size_t len; - } repl_data_buf; + } pending_repl_data; time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. @@ -2816,8 +2816,6 @@ void replicationStartPendingFork(void); void replicationHandleMasterDisconnection(void); void replicationCacheMaster(client *c); void resizeReplicationBacklog(void); -void setReplicaBufferLimit(); -void resizeReplicationBuffer(long long new_size); void replicationSetMaster(char *ip, int port); void replicationUnsetMaster(void); void refreshGoodSlavesCount(void); @@ -2849,7 +2847,7 @@ void abortFailover(const char *err); const char *getFailoverStateString(void); int isReplicaPsyncChannel(client *c); int isReplicaRdbChannel(client *c); -int isOngoingRdbChannelSync(); +int isOngoingRdbChannelSync(void); void abortRdbConnectionSync(int should_retry); void incrReadsProcessed(size_t nread);