Skip to content

Commit

Permalink
Active Replica Support
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSully committed Mar 24, 2019
1 parent fb45fef commit a7aa2b0
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 20 deletions.
5 changes: 5 additions & 0 deletions redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1562,3 +1562,8 @@ server-threads 2
# Should KeyDB pin threads to CPUs? By default this is disabled, and KeyDB will not bind threads.
# When enabled threads are bount to cores sequentially starting at core 0.
# server-thread-affinity true

# Uncomment the option below to enable Active Active support. Note that
# replicas will still sync in the normal way and incorrect ordering when
# bringing up replicas can result in data loss (the first master will win).
# active-replica yes
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ endif
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS)
FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(CXXFLAGS) $(REDIS_CFLAGS)
FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG)
FINAL_LIBS=-lm
FINAL_LIBS=-lm -luuid
DEBUG=-g -ggdb

ifeq ($(uname_S),SunOS)
Expand Down
11 changes: 11 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,16 @@ void loadServerConfigFromString(char *config) {
err = "Unknown argument: server-thread-affinity expects either true or false";
goto loaderr;
}
} else if (!strcasecmp(argv[0], "active-replica") && argc == 2) {
server.fActiveReplica = yesnotoi(argv[1]);
if (server.repl_slave_ro) {
server.repl_slave_ro = FALSE;
serverLog(LL_NOTICE, "Notice: \"active-replica yes\" implies \"replica-read-only no\"");
}
if (server.fActiveReplica == -1) {
server.fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA;
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}
Expand Down Expand Up @@ -2350,6 +2360,7 @@ int rewriteConfig(char *path) {
rewriteConfigYesNoOption(state,"lazyfree-lazy-server-del",server.lazyfree_lazy_server_del,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL);
rewriteConfigYesNoOption(state,"replica-lazy-flush",server.repl_slave_lazy_flush,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH);
rewriteConfigYesNoOption(state,"dynamic-hz",server.dynamic_hz,CONFIG_DEFAULT_DYNAMIC_HZ);
rewriteConfigYesNoOption(state,"active-replica",server.fActiveReplica,CONFIG_DEFAULT_ACTIVE_REPLICA);

/* Rewrite Sentinel config if in Sentinel mode. */
if (server.sentinel_mode) rewriteConfigSentinelOption(state);
Expand Down
3 changes: 2 additions & 1 deletion src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ client *createClient(int fd, int iel) {
c->bufAsync = NULL;
c->buflenAsync = 0;
c->bufposAsync = 0;
memset(c->uuid, 0, UUID_BINARY_LEN);

listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
Expand Down Expand Up @@ -2135,7 +2136,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
* corresponding part of the replication stream, will be propagated to
* the sub-slaves and to the replication backlog. */
processInputBufferAndReplicate(c);
aelock.arm(nullptr);
aelock.arm(c);
ProcessPendingAsyncWrites();
}

Expand Down
110 changes: 102 additions & 8 deletions src/replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <sys/socket.h>
#include <sys/stat.h>
#include <mutex>
#include <uuid/uuid.h>

void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd);
Expand Down Expand Up @@ -74,6 +75,21 @@ char *replicationGetSlaveName(client *c) {
return buf;
}

static bool FSameHost(client *clientA, client *clientB)
{
const unsigned char *a = clientA->uuid;
const unsigned char *b = clientB->uuid;

unsigned char zeroCheck = 0;
for (int i = 0; i < UUID_BINARY_LEN; ++i)
{
if (a[i] != b[i])
return false;
zeroCheck |= a[i];
}
return (zeroCheck != 0); // if the UUID is nil then it is never equal
}

/* ---------------------------------- MASTER -------------------------------- */

void createReplicationBacklog(void) {
Expand Down Expand Up @@ -117,7 +133,13 @@ void resizeReplicationBacklog(long long newsize) {

void freeReplicationBacklog(void) {
serverAssert(GlobalLocksAcquired());
serverAssert(listLength(server.slaves) == 0);
listIter li;
listNode *ln;
listRewind(server.slaves, &li);
while ((ln = listNext(&li))) {
// server.slaves should be empty, or filled with clients pending close
serverAssert(((client*)listNodeValue(ln))->flags & CLIENT_CLOSE_ASAP);
}
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}
Expand Down Expand Up @@ -186,7 +208,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
* propagate *identical* replication stream. In this way this slave can
* advertise the same replication ID as the master (since it shares the
* master replication history and has the same backlog and offsets). */
if (server.masterhost != NULL) return;
if (!server.fActiveReplica && server.masterhost != NULL) return;

/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
Expand Down Expand Up @@ -226,6 +248,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (server.current_client && FSameHost(server.current_client, slave)) continue;
addReplyAsync(slave,selectcmd);
}

Expand Down Expand Up @@ -268,6 +291,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {

/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (server.current_client && FSameHost(server.current_client, slave)) continue;

/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
Expand All @@ -282,10 +306,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
addReplyBulkAsync(slave,argv[j]);
}

/* Release the lock on all slaves */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
((client*)ln->value)->lock.unlock();
client *slave = (client*)ln->value;
slave->lock.unlock();
}
}

Expand All @@ -311,6 +335,8 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
std::lock_guard<decltype(slave->lock)> ulock(slave->lock);
if (FSameHost(slave, server.master))
continue; // Active Active case, don't feed back

/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
Expand Down Expand Up @@ -658,9 +684,11 @@ void syncCommand(client *c) {

/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
return;
if (!server.fActiveReplica) {
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
return;
}
}

/* SYNC can't be issued when the server has pending data to send to
Expand Down Expand Up @@ -790,6 +818,33 @@ void syncCommand(client *c) {
return;
}

void processReplconfUuid(client *c, robj *arg)
{
try
{
if (arg->type != OBJ_STRING)
throw "Invalid UUID";

const char *remoteUUID = (const char*)ptrFromObj(arg);
if (strlen(remoteUUID) != 36)
throw "Invalid UUID";

if (uuid_parse(remoteUUID, c->uuid) != 0)
throw "Invalid UUID";

char szServerUUID[36 + 2]; // 1 for the '+', another for '\0'
szServerUUID[0] = '+';
uuid_unparse(server.uuid, szServerUUID+1);
addReplyProto(c, szServerUUID, 37);
addReplyProto(c, "\r\n", 2);
}
catch (const char *szErr)
{
addReplyError(c, szErr);
return;
}
}

/* REPLCONF <option> <value> <option> <value> ...
* This command is used by a slave in order to configure the replication
* process before starting it with the SYNC command.
Expand Down Expand Up @@ -860,6 +915,10 @@ void replconfCommand(client *c) {
* to the slave. */
if (server.masterhost && server.master) replicationSendAck();
return;
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"uuid")) {
/* REPLCONF uuid is used to set and send the UUID of each host */
processReplconfUuid(c, c->argv[j+1]);
return; // the process function replies to the client for both error and success
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)ptrFromObj(c->argv[j]));
Expand Down Expand Up @@ -1134,6 +1193,10 @@ void replicationCreateMasterClient(int fd, int dbid) {
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
server.master->puser = NULL; /* This client can do everything. */

memcpy(server.master->uuid, server.master_uuid, UUID_BINARY_LEN);
memset(server.master_uuid, 0, UUID_BINARY_LEN); // make sure people don't use this temp storage buffer

memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
Expand Down Expand Up @@ -1738,7 +1801,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else {
server.repl_state = REPL_STATE_SEND_PORT;
server.repl_state = REPL_STATE_SEND_UUID;
}
}

Expand All @@ -1751,7 +1814,38 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
goto error;
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_UUID;
}

/* Send UUID */
if (server.repl_state == REPL_STATE_SEND_UUID) {
char szUUID[37];
memset(server.master_uuid, 0, UUID_BINARY_LEN);
uuid_unparse((unsigned char*)server.uuid, szUUID);
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","uuid",szUUID);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_UUID;
return;
}

/* Receive UUID */
if (server.repl_state == REPL_STATE_RECEIVE_UUID) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (err[0] == '-') {
serverLog(LL_WARNING, "non-fatal: Master doesn't understand REPLCONF uuid");
}
else {
if (strlen(err) != 37 // 36-byte UUID string and the leading '+'
|| uuid_parse(err+1, server.master_uuid) != 0)
{
serverLog(LL_WARNING, "Master replied with a UUID we don't understand");
sdsfree(err);
goto error;
}
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PORT;
// fallthrough
}

/* Set the slave port, so that Master's INFO command can list the
Expand Down
6 changes: 6 additions & 0 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include <locale.h>
#include <sys/socket.h>
#include <algorithm>
#include <uuid/uuid.h>

/* Our shared "common" objects */

Expand Down Expand Up @@ -2371,6 +2372,7 @@ void initServerConfig(void) {
server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL;
server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO;
server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
server.fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA;

unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
Expand Down Expand Up @@ -2962,6 +2964,10 @@ void initServer(void) {
server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
}

/* Generate UUID */
static_assert(sizeof(uuid_t) == sizeof(server.uuid), "UUIDs are standardized at 16-bytes");
uuid_generate((unsigned char*)server.uuid);

if (server.cluster_enabled) clusterInit();
replicationScriptCacheInit();
scriptingInit(1);
Expand Down
36 changes: 26 additions & 10 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ typedef long long mstime_t; /* millisecond time type. */
extern "C" {
#endif

#define UUID_BINARY_LEN 16

/* Error codes */
#define C_OK 0
#define C_ERR -1
Expand Down Expand Up @@ -186,6 +188,8 @@ extern "C" {
#define CONFIG_DEFAULT_THREADS 1
#define CONFIG_DEFAULT_THREAD_AFFINITY 0

#define CONFIG_DEFAULT_ACTIVE_REPLICA 0

#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
Expand Down Expand Up @@ -335,17 +339,19 @@ extern "C" {
#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */
#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */
#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */
#define REPL_STATE_SEND_UUID 6 /* send our UUID */
#define REPL_STATE_RECEIVE_UUID 7 /* they should ack with their UUID */
#define REPL_STATE_SEND_PORT 8 /* Send REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 9 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_IP 10 /* Send REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 11 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_CAPA 12 /* Send REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 13 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 14 /* Send PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 15 /* Wait for PSYNC reply */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */
#define REPL_STATE_CONNECTED 15 /* Connected to master */
#define REPL_STATE_TRANSFER 16 /* Receiving .rdb from master */
#define REPL_STATE_CONNECTED 17 /* Connected to master */

/* State of slaves from the POV of the master. Used in client->replstate.
* In SEND_BULK and ONLINE state the slave receives new updates
Expand Down Expand Up @@ -871,6 +877,11 @@ typedef struct client {
sds peerid; /* Cached peer ID. */
listNode *client_list_node; /* list node in client list */

/* UUID announced by the client (default nil) - used to detect multiple connections to/from the same peer */
/* compliant servers will announce their UUIDs when a replica connection is started, and return when asked */
/* UUIDs are transient and lost when the server is shut down */
unsigned char uuid[UUID_BINARY_LEN];

/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
Expand Down Expand Up @@ -1423,6 +1434,11 @@ struct redisServer {
pthread_mutex_t next_client_id_mutex;
pthread_mutex_t unixtime_mutex;

int fActiveReplica; /* Can this replica also be a master? */
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
/* After we've connected with our master use the UUID in server.master */

struct fastlock flock;
};

Expand Down

0 comments on commit a7aa2b0

Please sign in to comment.