Skip to content

Commit

Permalink
Decrement pending_repl_data.len during replica buffer streaming.
Browse files Browse the repository at this point in the history
Stop using stat_repl_processed_bytes to follow streaming progress,
instread keep record of the buffer's peak.
  • Loading branch information
naglera committed Jan 4, 2024
1 parent 33bfd16 commit 21c797e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
11 changes: 8 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2673,19 +2673,19 @@ void fullSyncWithMaster(connection* conn) {
void replDataBufInit(void) {
serverAssert(server.pending_repl_data.blocks == NULL);
server.pending_repl_data.len = 0;
server.pending_repl_data.peak = 0;
server.pending_repl_data.blocks = listCreate();
server.pending_repl_data.blocks->free = zfree;
}

/* Track replication data streaming progress, and serve clients from time to time */
void replStreamProgressCallback(long long offset, int readlen) {
void replStreamProgressCallback(size_t offset, int readlen) {
if (server.loading_process_events_interval_bytes &&
(offset + readlen)/server.loading_process_events_interval_bytes > offset/server.loading_process_events_interval_bytes)
{
replicationSendNewlineToMaster();
processEventsWhileBlocked();
}
atomicIncr(server.stat_repl_processed_bytes, readlen);
}

/* Reads replication data from primary into specified repl buffer block */
Expand Down Expand Up @@ -2748,6 +2748,10 @@ void bufferReplData(connection *conn) {
tail->used = 0;
listAddNodeTail(server.pending_repl_data.blocks, tail);
server.pending_repl_data.len += tail->size;
/* Update buffer's peak */
if (server.pending_repl_data.peak < server.pending_repl_data.len)
server.pending_repl_data.peak = server.pending_repl_data.len;


read = min(readlen, tail->size);
readlen -= read;
Expand Down Expand Up @@ -2777,8 +2781,9 @@ void streamReplDataBufToDb(client *c) {
c->querybuf = sdscatlen(c->querybuf, o->buf, o->used);
c->read_reploff += o->used;
processInputBuffer(c);
offset += o->used;
server.pending_repl_data.len -= o->used;
replStreamProgressCallback(offset, o->used);
offset += o->used;

listDelNode(server.pending_repl_data.blocks, cur);
}
Expand Down
7 changes: 2 additions & 5 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2522,7 +2522,6 @@ void resetServerStats(void) {
atomicSet(server.stat_net_output_bytes, 0);
atomicSet(server.stat_net_repl_input_bytes, 0);
atomicSet(server.stat_net_repl_output_bytes, 0);
atomicSet(server.stat_repl_processed_bytes, 0);
server.stat_unexpected_error_replies = 0;
server.stat_total_error_replies = 0;
server.stat_dump_payload_sanitizations = 0;
Expand Down Expand Up @@ -5643,6 +5642,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"mem_replication_backlog:%zu\r\n"
"mem_total_replication_buffers:%zu\r\n"
"replicas_replication_buffer_size:%zu\r\n"
"replicas_replication_buffer_peak:%zu\r\n"
"mem_clients_slaves:%zu\r\n"
"mem_clients_normal:%zu\r\n"
"mem_cluster_links:%zu\r\n"
Expand Down Expand Up @@ -5698,6 +5698,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
mh->repl_backlog,
server.repl_buffer_mem,
server.pending_repl_data.len,
server.pending_repl_data.peak,
mh->clients_slaves,
mh->clients_normal,
mh->cluster_links,
Expand Down Expand Up @@ -5850,7 +5851,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
long long stat_total_reads_processed, stat_total_writes_processed;
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
long long stat_repl_processed_bytes;
long long current_eviction_exceeded_time = server.stat_last_eviction_exceeded_time ?
(long long) elapsedUs(server.stat_last_eviction_exceeded_time): 0;
long long current_active_defrag_time = server.stat_last_active_defrag_time ?
Expand All @@ -5861,7 +5861,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
atomicGet(server.stat_repl_processed_bytes, stat_repl_processed_bytes);

if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
Expand All @@ -5873,7 +5872,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"total_net_output_bytes:%lld\r\n"
"total_net_repl_input_bytes:%lld\r\n"
"total_net_repl_output_bytes:%lld\r\n"
"total_repl_processed_bytes:%lld\r\n"
"instantaneous_input_kbps:%.2f\r\n"
"instantaneous_output_kbps:%.2f\r\n"
"instantaneous_input_repl_kbps:%.2f\r\n"
Expand Down Expand Up @@ -5924,7 +5922,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
stat_net_output_bytes + stat_net_repl_output_bytes,
stat_net_repl_input_bytes,
stat_net_repl_output_bytes,
stat_repl_processed_bytes,
(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION)/1024,
Expand Down
7 changes: 2 additions & 5 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,7 @@ typedef struct replBacklog {
typedef struct replDataBuf {
list *blocks; /* List of replDataBufBlock */
size_t len; /* Number of bytes stored in all blocks */
size_t peak;
} replDataBuf;

typedef struct {
Expand Down Expand Up @@ -1700,7 +1701,6 @@ struct redisServer {
redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
redisAtomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */
redisAtomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */
redisAtomic long long stat_repl_processed_bytes; /* Bytes processed from replica's local replication buffer after full sync */
size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */
size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */
monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */
Expand Down Expand Up @@ -1868,10 +1868,7 @@ struct redisServer {
int repl_ping_slave_period; /* Master pings the slave every N seconds */
replBacklog *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
struct { /* Replication data buffer for rdb-channel sync */
list *blocks; /* list of replDataBufBlock */
size_t len;
} pending_repl_data;
replDataBuf pending_repl_data; /* Replication data buffer for rdb-channel sync */
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
Expand Down

0 comments on commit 21c797e

Please sign in to comment.