From a7aa2b074049a130761bc0a98d47130b6a0ff817 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 24 Mar 2019 15:39:10 -0400 Subject: [PATCH] Active Replica Support --- redis.conf | 5 ++ src/Makefile | 2 +- src/config.c | 11 +++++ src/networking.cpp | 3 +- src/replication.cpp | 110 ++++++++++++++++++++++++++++++++++++++++---- src/server.cpp | 6 +++ src/server.h | 36 +++++++++++---- 7 files changed, 153 insertions(+), 20 deletions(-) diff --git a/redis.conf b/redis.conf index c37989ece..99f66a667 100644 --- a/redis.conf +++ b/redis.conf @@ -1562,3 +1562,8 @@ server-threads 2 # Should KeyDB pin threads to CPUs? By default this is disabled, and KeyDB will not bind threads. # When enabled threads are bount to cores sequentially starting at core 0. # server-thread-affinity true + +# Uncomment the option below to enable Active Active support. Note that +# replicas will still sync in the normal way and incorrect ordering when +# bringing up replicas can result in data loss (the first master will win). +# active-replica yes diff --git a/src/Makefile b/src/Makefile index 7ae04a2fa..70e297929 100644 --- a/src/Makefile +++ b/src/Makefile @@ -94,7 +94,7 @@ endif FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(CXXFLAGS) $(REDIS_CFLAGS) FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG) -FINAL_LIBS=-lm +FINAL_LIBS=-lm -luuid DEBUG=-g -ggdb ifeq ($(uname_S),SunOS) diff --git a/src/config.c b/src/config.c index fe563e7ed..dd8bdfdf8 100644 --- a/src/config.c +++ b/src/config.c @@ -849,6 +849,16 @@ void loadServerConfigFromString(char *config) { err = "Unknown argument: server-thread-affinity expects either true or false"; goto loaderr; } + } else if (!strcasecmp(argv[0], "active-replica") && argc == 2) { + server.fActiveReplica = yesnotoi(argv[1]); + if (server.repl_slave_ro) { + server.repl_slave_ro = FALSE; + serverLog(LL_NOTICE, "Notice: \"active-replica yes\" implies \"replica-read-only no\""); + } + if (server.fActiveReplica == -1) { + server.fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA; + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -2350,6 +2360,7 @@ int rewriteConfig(char *path) { rewriteConfigYesNoOption(state,"lazyfree-lazy-server-del",server.lazyfree_lazy_server_del,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL); rewriteConfigYesNoOption(state,"replica-lazy-flush",server.repl_slave_lazy_flush,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH); rewriteConfigYesNoOption(state,"dynamic-hz",server.dynamic_hz,CONFIG_DEFAULT_DYNAMIC_HZ); + rewriteConfigYesNoOption(state,"active-replica",server.fActiveReplica,CONFIG_DEFAULT_ACTIVE_REPLICA); /* Rewrite Sentinel config if in Sentinel mode. */ if (server.sentinel_mode) rewriteConfigSentinelOption(state); diff --git a/src/networking.cpp b/src/networking.cpp index b4f14f238..550f84b9f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -226,6 +226,7 @@ client *createClient(int fd, int iel) { c->bufAsync = NULL; c->buflenAsync = 0; c->bufposAsync = 0; + memset(c->uuid, 0, UUID_BINARY_LEN); listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); @@ -2135,7 +2136,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ processInputBufferAndReplicate(c); - aelock.arm(nullptr); + aelock.arm(c); ProcessPendingAsyncWrites(); } diff --git a/src/replication.cpp b/src/replication.cpp index 23989428f..5c885659d 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -38,6 +38,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(int newfd); @@ -74,6 +75,21 @@ char *replicationGetSlaveName(client *c) { return buf; } +static bool FSameHost(client *clientA, client *clientB) +{ + const unsigned char *a = clientA->uuid; + const unsigned char *b = clientB->uuid; + + unsigned char zeroCheck = 0; + for (int i = 0; i < UUID_BINARY_LEN; ++i) + { + if (a[i] != b[i]) + return false; + zeroCheck |= a[i]; + } + return (zeroCheck != 0); // if the UUID is nil then it is never equal +} + /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { @@ -117,7 +133,13 @@ void resizeReplicationBacklog(long long newsize) { void freeReplicationBacklog(void) { serverAssert(GlobalLocksAcquired()); - serverAssert(listLength(server.slaves) == 0); + listIter li; + listNode *ln; + listRewind(server.slaves, &li); + while ((ln = listNext(&li))) { + // server.slaves should be empty, or filled with clients pending close + serverAssert(((client*)listNodeValue(ln))->flags & CLIENT_CLOSE_ASAP); + } zfree(server.repl_backlog); server.repl_backlog = NULL; } @@ -186,7 +208,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * propagate *identical* replication stream. In this way this slave can * advertise the same replication ID as the master (since it shares the * master replication history and has the same backlog and offsets). */ - if (server.masterhost != NULL) return; + if (!server.fActiveReplica && server.masterhost != NULL) return; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ @@ -226,6 +248,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { while((ln = listNext(&li))) { client *slave = (client*)ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (server.current_client && FSameHost(server.current_client, slave)) continue; addReplyAsync(slave,selectcmd); } @@ -268,6 +291,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (server.current_client && FSameHost(server.current_client, slave)) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), @@ -282,10 +306,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { addReplyBulkAsync(slave,argv[j]); } - /* Release the lock on all slaves */ listRewind(slaves,&li); while((ln = listNext(&li))) { - ((client*)ln->value)->lock.unlock(); + client *slave = (client*)ln->value; + slave->lock.unlock(); } } @@ -311,6 +335,8 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *slave = (client*)ln->value; std::lock_guardlock)> ulock(slave->lock); + if (FSameHost(slave, server.master)) + continue; // Active Active case, don't feed back /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; @@ -658,9 +684,11 @@ void syncCommand(client *c) { /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ - if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { - addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); - return; + if (!server.fActiveReplica) { + if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { + addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); + return; + } } /* SYNC can't be issued when the server has pending data to send to @@ -790,6 +818,33 @@ void syncCommand(client *c) { return; } +void processReplconfUuid(client *c, robj *arg) +{ + try + { + if (arg->type != OBJ_STRING) + throw "Invalid UUID"; + + const char *remoteUUID = (const char*)ptrFromObj(arg); + if (strlen(remoteUUID) != 36) + throw "Invalid UUID"; + + if (uuid_parse(remoteUUID, c->uuid) != 0) + throw "Invalid UUID"; + + char szServerUUID[36 + 2]; // 1 for the '+', another for '\0' + szServerUUID[0] = '+'; + uuid_unparse(server.uuid, szServerUUID+1); + addReplyProto(c, szServerUUID, 37); + addReplyProto(c, "\r\n", 2); + } + catch (const char *szErr) + { + addReplyError(c, szErr); + return; + } +} + /* REPLCONF