-
Notifications
You must be signed in to change notification settings - Fork 734
/
Copy pathcluster.c
1473 lines (1308 loc) · 57.9 KB
/
cluster.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2009-2012, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* cluster.c contains the common parts of a clustering
* implementation, the parts that are shared between
* any implementation of clustering.
*/
#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include <ctype.h>
/* -----------------------------------------------------------------------------
* Key space handling
* -------------------------------------------------------------------------- */
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However, if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key, keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s + 1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s + 1) return crc16(key, keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key + s + 1, e - s - 1) & 0x3FFF;
}
/* If it can be inferred that the given glob-style pattern, as implemented in
* stringmatchlen() in util.c, only can match keys belonging to a single slot,
* that slot is returned. Otherwise -1 is returned. */
int patternHashSlot(char *pattern, int length) {
int s = -1; /* index of the first '{' */
for (int i = 0; i < length; i++) {
if (pattern[i] == '*' || pattern[i] == '?' || pattern[i] == '[') {
/* Wildcard or character class found. Keys can be in any slot. */
return -1;
} else if (pattern[i] == '\\') {
/* Escaped character. Computing slot in this case is not
* implemented. We would need a temp buffer. */
return -1;
} else if (s == -1 && pattern[i] == '{') {
/* Opening brace '{' found. */
s = i;
} else if (s >= 0 && pattern[i] == '}' && i == s + 1) {
/* Empty tag '{}' found. The whole key is hashed. Ignore braces. */
s = -2;
} else if (s >= 0 && pattern[i] == '}') {
/* Non-empty tag '{...}' found. Hash what's between braces. */
return crc16(pattern + s + 1, i - s - 1) & 0x3FFF;
}
}
/* The pattern matches a single key. Hash the whole pattern. */
return crc16(pattern, length) & 0x3FFF;
}
ConnectionType *connTypeOfCluster(void) {
if (server.tls_cluster) {
return connectionTypeTls();
}
return connectionTypeTcp();
}
/* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */
/* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */
void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
unsigned char buf[2];
uint64_t crc;
/* Serialize the object in an RDB-like format. It consist of an object type
* byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload, sdsempty());
serverAssert(rdbSaveObjectType(payload, o));
serverAssert(rdbSaveObject(payload, o, key, dbid));
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
* ----------------+---------------------+---------------+
* RDB version and CRC are both in little endian.
*/
/* RDB version */
buf[0] = RDB_VERSION & 0xff;
buf[1] = (RDB_VERSION >> 8) & 0xff;
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr, buf, 2);
/* CRC64 */
crc = crc64(0, (unsigned char *)payload->io.buffer.ptr, sdslen(payload->io.buffer.ptr));
memrev64ifbe(&crc);
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr, &crc, 8);
}
/* Verify that the RDB version of the dump payload matches the one of this
* instance and that the checksum is ok.
* If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
* is returned. If rdbver_ptr is not NULL, its populated with the value read
* from the input buffer. */
int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
unsigned char *footer;
uint16_t rdbver;
uint64_t crc;
/* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
if (len < 10) return C_ERR;
footer = p + (len - 10);
/* Set and verify RDB version. */
rdbver = (footer[1] << 8) | footer[0];
if (rdbver_ptr) {
*rdbver_ptr = rdbver;
}
if (rdbver > RDB_VERSION) return C_ERR;
if (server.skip_checksum_validation) return C_OK;
/* Verify CRC64 */
crc = crc64(0, p, len - 8);
memrev64ifbe(&crc);
return (memcmp(&crc, footer + 2, 8) == 0) ? C_OK : C_ERR;
}
/* DUMP keyname
* DUMP is actually not used by Cluster but it is the obvious
* complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
robj *o;
rio payload;
/* Check if the key is here. */
if ((o = lookupKeyRead(c->db, c->argv[1])) == NULL) {
addReplyNull(c);
return;
}
/* Create the DUMP encoded representation. */
createDumpPayload(&payload, o, c->argv[1], c->db->id);
/* Transfer to the client */
addReplyBulkSds(c, payload.io.buffer.ptr);
return;
}
/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
void restoreCommand(client *c) {
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
rio payload;
int j, type, replace = 0, absttl = 0;
robj *obj;
/* Parse additional options */
for (j = 4; j < c->argc; j++) {
int additional = c->argc - j - 1;
if (!strcasecmp(c->argv[j]->ptr, "replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "absttl")) {
absttl = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "idletime") && additional >= 1 && lfu_freq == -1) {
if (getLongLongFromObjectOrReply(c, c->argv[j + 1], &lru_idle, NULL) != C_OK) return;
if (lru_idle < 0) {
addReplyError(c, "Invalid IDLETIME value, must be >= 0");
return;
}
lru_clock = LRU_CLOCK();
j++; /* Consume additional arg. */
} else if (!strcasecmp(c->argv[j]->ptr, "freq") && additional >= 1 && lru_idle == -1) {
if (getLongLongFromObjectOrReply(c, c->argv[j + 1], &lfu_freq, NULL) != C_OK) return;
if (lfu_freq < 0 || lfu_freq > 255) {
addReplyError(c, "Invalid FREQ value, must be >= 0 and <= 255");
return;
}
j++; /* Consume additional arg. */
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}
/* Make sure this key does not already exist here... */
robj *key = c->argv[1];
if (!replace && lookupKeyWrite(c->db, key) != NULL) {
addReplyErrorObject(c, shared.busykeyerr);
return;
}
/* Check if the TTL value makes sense */
if (getLongLongFromObjectOrReply(c, c->argv[2], &ttl, NULL) != C_OK) {
return;
} else if (ttl < 0) {
addReplyError(c, "Invalid TTL value, must be >= 0");
return;
}
/* Verify RDB version and data checksum. */
if (verifyDumpPayload(c->argv[3]->ptr, sdslen(c->argv[3]->ptr), NULL) == C_ERR) {
addReplyError(c, "DUMP payload version or checksum are wrong");
return;
}
rioInitWithBuffer(&payload, c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type, &payload, key->ptr, c->db->id, NULL)) == NULL)) {
addReplyError(c, "Bad data format");
return;
}
/* Remove the old key if needed. */
int deleted = 0;
if (replace) deleted = dbDelete(c->db, key);
if (ttl && !absttl) ttl += commandTimeSnapshot();
if (ttl && checkAlreadyExpired(ttl)) {
if (deleted) {
robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
rewriteClientCommandVector(c, 2, aux, key);
signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
server.dirty++;
}
decrRefCount(obj);
addReply(c, shared.ok);
return;
}
/* Create the key and set the TTL if any */
dbAdd(c->db, key, obj);
if (ttl) {
setExpire(c, c->db, key, ttl);
if (!absttl) {
/* Propagate TTL as absolute timestamp */
robj *ttl_obj = createStringObjectFromLongLong(ttl);
rewriteClientCommandArgument(c, 2, ttl_obj);
decrRefCount(ttl_obj);
rewriteClientCommandArgument(c, c->argc, shared.absttl);
}
}
objectSetLRUOrLFU(obj, lfu_freq, lru_idle, lru_clock, 1000);
signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "restore", key, c->db->id);
addReply(c, shared.ok);
server.dirty++;
}
/* MIGRATE socket cache implementation.
*
* We take a map between host:ip and a TCP socket that we used to connect
* to this instance in recent time.
* This sockets are closed when the max number we cache is reached, and also
* in serverCron() when they are around for more than a few seconds. */
#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
typedef struct migrateCachedSocket {
connection *conn;
long last_dbid;
time_t last_use_time;
} migrateCachedSocket;
/* Return a migrateCachedSocket containing a TCP socket connected with the
* target instance, possibly returning a cached one.
*
* This function is responsible of sending errors to the client if a
* connection can't be established. In this case -1 is returned.
* Otherwise on success the socket is returned, and the caller should not
* attempt to free it after usage.
*
* If the caller detects an error while using the socket, migrateCloseSocket()
* should be called so that the connection will be created from scratch
* the next time. */
migrateCachedSocket *migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
connection *conn;
sds name = sdsempty();
migrateCachedSocket *cs;
/* Check if we have an already cached socket for this ip:port pair. */
name = sdscatlen(name, host->ptr, sdslen(host->ptr));
name = sdscatlen(name, ":", 1);
name = sdscatlen(name, port->ptr, sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets, name);
if (cs) {
sdsfree(name);
cs->last_use_time = server.unixtime;
return cs;
}
/* No cached socket, create one. */
if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
/* Too many items, drop one at random. */
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets, dictGetKey(de));
}
/* Create the connection */
conn = connCreate(connTypeOfCluster());
if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout) != C_OK) {
addReplyError(c, "-IOERR error or timeout connecting to the client");
connClose(conn);
sdsfree(name);
return NULL;
}
connEnableTcpNoDelay(conn);
/* Add to the cache and return it to the caller. */
cs = zmalloc(sizeof(*cs));
cs->conn = conn;
cs->last_dbid = -1;
cs->last_use_time = server.unixtime;
dictAdd(server.migrate_cached_sockets, name, cs);
return cs;
}
/* Free a migrate cached connection. */
void migrateCloseSocket(robj *host, robj *port) {
sds name = sdsempty();
migrateCachedSocket *cs;
name = sdscatlen(name, host->ptr, sdslen(host->ptr));
name = sdscatlen(name, ":", 1);
name = sdscatlen(name, port->ptr, sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets, name);
if (!cs) {
sdsfree(name);
return;
}
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets, name);
sdsfree(name);
}
void migrateCloseTimedoutSockets(void) {
dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
migrateCachedSocket *cs = dictGetVal(de);
if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets, dictGetKey(de));
}
}
dictReleaseIterator(di);
}
/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
* AUTH2 username password]
*
* On in the multiple keys form:
*
* MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
* AUTH2 username password] KEYS key1 key2 ... keyN */
void migrateCommand(client *c) {
migrateCachedSocket *cs;
int copy = 0, replace = 0, j;
char *username = NULL;
char *password = NULL;
long timeout;
long dbid;
robj **ov = NULL; /* Objects to migrate. */
robj **kv = NULL; /* Key names. */
robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
rio cmd, payload;
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;
/* To support the KEYS option we need the following additional state. */
int first_key = 3; /* Argument index of the first key. */
int num_keys = 1; /* By default only migrate the 'key' argument. */
/* Parse additional options */
for (j = 6; j < c->argc; j++) {
int moreargs = (c->argc - 1) - j;
if (!strcasecmp(c->argv[j]->ptr, "copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "auth")) {
if (!moreargs) {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
redactClientCommandArgument(c, j);
} else if (!strcasecmp(c->argv[j]->ptr, "auth2")) {
if (moreargs < 2) {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
username = c->argv[++j]->ptr;
redactClientCommandArgument(c, j);
password = c->argv[++j]->ptr;
redactClientCommandArgument(c, j);
} else if (!strcasecmp(c->argv[j]->ptr, "keys")) {
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c, "When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
}
first_key = j + 1;
num_keys = c->argc - j - 1;
break; /* All the remaining args are keys. */
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}
/* Sanity check */
if (getLongFromObjectOrReply(c, c->argv[5], &timeout, NULL) != C_OK ||
getLongFromObjectOrReply(c, c->argv[4], &dbid, NULL) != C_OK) {
return;
}
if (timeout <= 0) timeout = 1000;
/* Check if the keys are here. If at least one key is to migrate, do it
* otherwise if all the keys are missing reply with "NOKEY" to signal
* the caller there was nothing to migrate. We don't return an error in
* this case, since often this is due to a normal condition like the key
* expiring in the meantime. */
ov = zrealloc(ov, sizeof(robj *) * num_keys);
kv = zrealloc(kv, sizeof(robj *) * num_keys);
int oi = 0;
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db, c->argv[first_key + j])) != NULL) {
kv[oi] = c->argv[first_key + j];
oi++;
}
}
num_keys = oi;
if (num_keys == 0) {
zfree(ov);
zfree(kv);
addReplySds(c, sdsnew("+NOKEY\r\n"));
return;
}
try_again:
write_error = 0;
/* Connect */
cs = migrateGetSocket(c, c->argv[1], c->argv[2], timeout);
if (cs == NULL) {
zfree(ov);
zfree(kv);
return; /* error sent to the client by migrateGetSocket() */
}
rioInitWithBuffer(&cmd, sdsempty());
/* Authentication */
if (password) {
int arity = username ? 3 : 2;
serverAssertWithInfo(c, NULL, rioWriteBulkCount(&cmd, '*', arity));
serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, "AUTH", 4));
if (username) {
serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, username, sdslen(username)));
}
serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, password, sdslen(password)));
}
/* Send the SELECT command if the current DB is not already selected. */
int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
if (select) {
serverAssertWithInfo(c, NULL, rioWriteBulkCount(&cmd, '*', 2));
serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, "SELECT", 6));
serverAssertWithInfo(c, NULL, rioWriteBulkLongLong(&cmd, dbid));
}
int non_expired = 0; /* Number of keys that we'll find non expired.
Note that serializing large keys may take some time
so certain keys that were found non expired by the
lookupKey() function, may be expired later. */
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db, kv[j]);
if (expireat != -1) {
ttl = expireat - commandTimeSnapshot();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
/* Relocate valid (non expired) keys and values into the array in successive
* positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */
ov[non_expired] = ov[j];
kv[non_expired++] = kv[j];
serverAssertWithInfo(c, NULL, rioWriteBulkCount(&cmd, '*', replace ? 5 : 4));
if (server.cluster_enabled)
serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, "RESTORE-ASKING", 14));
else
serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, "RESTORE", 7));
serverAssertWithInfo(c, NULL, sdsEncodedObject(kv[j]));
serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, kv[j]->ptr, sdslen(kv[j]->ptr)));
serverAssertWithInfo(c, NULL, rioWriteBulkLongLong(&cmd, ttl));
/* Emit the payload argument, that is the serialized object using
* the DUMP format. */
createDumpPayload(&payload, ov[j], kv[j], dbid);
serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
/* Add the REPLACE option to the RESTORE command if it was specified
* as a MIGRATE option. */
if (replace) serverAssertWithInfo(c, NULL, rioWriteBulkString(&cmd, "REPLACE", 7));
}
/* Fix the actual number of keys we are migrating. */
num_keys = non_expired;
/* Transfer the query to the other node in 64K chunks. */
errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
while ((towrite = sdslen(buf) - pos) > 0) {
towrite = (towrite > (64 * 1024) ? (64 * 1024) : towrite);
nwritten = connSyncWrite(cs->conn, buf + pos, towrite, timeout);
if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten;
}
}
char buf0[1024]; /* Auth reply. */
char buf1[1024]; /* Select reply. */
char buf2[1024]; /* Restore reply. */
/* Read the AUTH reply if needed. */
if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0) goto socket_err;
/* Read the SELECT reply if needed. */
if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0) goto socket_err;
/* Read the RESTORE replies. */
int error_from_target = 0;
int socket_error = 0;
int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
/* Allocate the new argument vector that will replace the current command,
* to propagate the MIGRATE as a DEL command (if no COPY option was given).
* We allocate num_keys+1 because the additional argument is for "DEL"
* command name itself. */
if (!copy) newargv = zmalloc(sizeof(robj *) * (num_keys + 1));
for (j = 0; j < num_keys; j++) {
if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1;
break;
}
if ((password && buf0[0] == '-') || (select && buf1[0] == '-') || buf2[0] == '-') {
/* On error assume that last_dbid is no longer valid. */
if (!error_from_target) {
cs->last_dbid = -1;
char *errbuf;
if (password && buf0[0] == '-')
errbuf = buf0;
else if (select && buf1[0] == '-')
errbuf = buf1;
else
errbuf = buf2;
error_from_target = 1;
addReplyErrorFormat(c, "Target instance replied with error: %s", errbuf + 1);
}
} else {
if (!copy) {
/* No COPY option: remove the local key, signal the change. */
dbDelete(c->db, kv[j]);
signalModifiedKey(c, c->db, kv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", kv[j], c->db->id);
server.dirty++;
/* Populate the argument vector to replace the old one. */
newargv[del_idx++] = kv[j];
incrRefCount(kv[j]);
}
}
}
/* On socket error, if we want to retry, do it now before rewriting the
* command vector. We only retry if we are sure nothing was processed
* and we failed to read the first reply (j == 0 test). */
if (!error_from_target && socket_error && j == 0 && may_retry && errno != ETIMEDOUT) {
goto socket_err; /* A retry is guaranteed because of tested conditions.*/
}
/* On socket errors, close the migration socket now that we still have
* the original host/port in the ARGV. Later the original command may be
* rewritten to DEL and will be too later. */
if (socket_error) migrateCloseSocket(c->argv[1], c->argv[2]);
if (!copy) {
/* Translate MIGRATE as DEL for replication/AOF. Note that we do
* this only for the keys for which we received an acknowledgement
* from the receiving server, by using the del_idx index. */
if (del_idx > 1) {
newargv[0] = createStringObject("DEL", 3);
/* Note that the following call takes ownership of newargv. */
replaceClientCommandVector(c, del_idx, newargv);
argv_rewritten = 1;
} else {
/* No key transfer acknowledged, no need to rewrite as DEL. */
zfree(newargv);
}
newargv = NULL; /* Make it safe to call zfree() on it in the future. */
}
/* If we are here and a socket error happened, we don't want to retry.
* Just signal the problem to the client, but only do it if we did not
* already queue a different error reported by the destination server. */
if (!error_from_target && socket_error) {
may_retry = 0;
goto socket_err;
}
if (!error_from_target) {
/* Success! Update the last_dbid in migrateCachedSocket, so that we can
* avoid SELECT the next time if the target DB is the same. Reply +OK.
*
* Note: If we reached this point, even if socket_error is true
* still the SELECT command succeeded (otherwise the code jumps to
* socket_err label. */
cs->last_dbid = dbid;
addReply(c, shared.ok);
} else {
/* On error we already sent it in the for loop above, and set
* the currently selected socket to -1 to force SELECT the next time. */
}
sdsfree(cmd.io.buffer.ptr);
zfree(ov);
zfree(kv);
zfree(newargv);
return;
/* On socket errors we try to close the cached socket and try again.
* It is very common for the cached socket to get closed, if just reopening
* it works it's a shame to notify the error to the caller. */
socket_err:
/* Cleanup we want to perform in both the retry and no retry case.
* Note: Closing the migrate socket will also force SELECT next time. */
sdsfree(cmd.io.buffer.ptr);
/* If the command was rewritten as DEL and there was a socket error,
* we already closed the socket earlier. While migrateCloseSocket()
* is idempotent, the host/port arguments are now gone, so don't do it
* again. */
if (!argv_rewritten) migrateCloseSocket(c->argv[1], c->argv[2]);
zfree(newargv);
newargv = NULL; /* This will get reallocated on retry. */
/* Retry only if it's not a timeout and we never attempted a retry
* (or the code jumping here did not set may_retry to zero). */
if (errno != ETIMEDOUT && may_retry) {
may_retry = 0;
goto try_again;
}
/* Cleanup we want to do if no retry is attempted. */
zfree(ov);
zfree(kv);
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-IOERR error or timeout %s to target instance",
write_error ? "writing" : "reading"));
return;
}
/* Cluster node sanity check. Returns C_OK if the node id
* is valid an C_ERR otherwise. */
int verifyClusterNodeId(const char *name, int length) {
if (length != CLUSTER_NAMELEN) return C_ERR;
for (int i = 0; i < length; i++) {
if (name[i] >= 'a' && name[i] <= 'z') continue;
if (name[i] >= '0' && name[i] <= '9') continue;
return C_ERR;
}
return C_OK;
}
int isValidAuxChar(int c) {
return isalnum(c) || (strchr("!#$%&()*+.:;<>?@[]^{|}~", c) == NULL);
}
int isValidAuxString(char *s, unsigned int length) {
for (unsigned i = 0; i < length; i++) {
if (!isValidAuxChar(s[i])) return 0;
}
return 1;
}
void clusterCommandMyId(client *c) {
char *name = clusterNodeGetName(getMyClusterNode());
if (name) {
addReplyBulkCBuffer(c, name, CLUSTER_NAMELEN);
} else {
addReplyError(c, "No ID yet");
}
}
int clusterNodeIsMyself(clusterNode *n) {
return n == getMyClusterNode();
}
void clusterCommandMyShardId(client *c) {
char *sid = clusterNodeGetShardId(getMyClusterNode());
if (sid) {
addReplyBulkCBuffer(c, sid, CLUSTER_NAMELEN);
} else {
addReplyError(c, "No shard ID yet");
}
}
/* When a cluster command is called, we need to decide whether to return TLS info or
* non-TLS info by the client's connection type. However if the command is called by
* a Lua script or RM_call, there is no connection in the fake client, so we use
* server.current_client here to get the real client if available. And if it is not
* available (modules may call commands without a real client), we return the default
* info, which is determined by server.tls_cluster. */
static int shouldReturnTlsInfo(void) {
if (server.current_client && server.current_client->conn) {
return connIsTLS(server.current_client->conn);
} else {
return server.tls_cluster;
}
}
unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreDictSize(server.db->keys, slot);
}
void clusterCommandHelp(client *c) {
const char *help[] = {
"COUNTKEYSINSLOT <slot>",
" Return the number of keys in <slot>.",
"GETKEYSINSLOT <slot> <count>",
" Return key names stored by current node in a slot.",
"INFO",
" Return information about the cluster.",
"KEYSLOT <key>",
" Return the hash slot for <key>.",
"MYID",
" Return the node id.",
"MYSHARDID",
" Return the node's shard id.",
"NODES",
" Return cluster configuration seen by node. Output format:",
" <id> <ip:port@bus-port[,hostname]> <flags> <primary> <pings> <pongs> <epoch> <link> <slot> ...",
"REPLICAS <node-id>",
" Return <node-id> replicas.",
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, primary and replicas IP addresses, ports and ids",
"SHARDS",
" Return information about slot range mappings and the nodes associated with them.",
NULL};
addExtendedReplyHelp(c, help, clusterCommandExtendedHelp());
}
void clusterCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) {
clusterCommandHelp(c);
} else if (!strcasecmp(c->argv[1]->ptr, "nodes") && c->argc == 2) {
/* CLUSTER NODES */
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo());
addReplyVerbatim(c, nodes, sdslen(nodes), "txt");
sdsfree(nodes);
} else if (!strcasecmp(c->argv[1]->ptr, "myid") && c->argc == 2) {
/* CLUSTER MYID */
clusterCommandMyId(c);
} else if (!strcasecmp(c->argv[1]->ptr, "myshardid") && c->argc == 2) {
/* CLUSTER MYSHARDID */
clusterCommandMyShardId(c);
} else if (!strcasecmp(c->argv[1]->ptr, "slots") && c->argc == 2) {
/* CLUSTER SLOTS */
clusterCommandSlots(c);
} else if (!strcasecmp(c->argv[1]->ptr, "shards") && c->argc == 2) {
/* CLUSTER SHARDS */
clusterCommandShards(c);
} else if (!strcasecmp(c->argv[1]->ptr, "info") && c->argc == 2) {
/* CLUSTER INFO */
sds info = genClusterInfoString();
/* Produce the reply protocol. */
addReplyVerbatim(c, info, sdslen(info), "txt");
sdsfree(info);
} else if (!strcasecmp(c->argv[1]->ptr, "keyslot") && c->argc == 3) {
/* CLUSTER KEYSLOT <key> */
sds key = c->argv[2]->ptr;
addReplyLongLong(c, keyHashSlot(key, sdslen(key)));
} else if (!strcasecmp(c->argv[1]->ptr, "countkeysinslot") && c->argc == 3) {
/* CLUSTER COUNTKEYSINSLOT <slot> */
long long slot;
if (getLongLongFromObjectOrReply(c, c->argv[2], &slot, NULL) != C_OK) return;
if (slot < 0 || slot >= CLUSTER_SLOTS) {
addReplyError(c, "Invalid slot");
return;
}
addReplyLongLong(c, countKeysInSlot(slot));
} else if (!strcasecmp(c->argv[1]->ptr, "getkeysinslot") && c->argc == 4) {
/* CLUSTER GETKEYSINSLOT <slot> <count> */
long long maxkeys, slot;
if (getLongLongFromObjectOrReply(c, c->argv[2], &slot, NULL) != C_OK) return;
if (getLongLongFromObjectOrReply(c, c->argv[3], &maxkeys, NULL) != C_OK) return;
if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
addReplyError(c, "Invalid slot or number of keys");
return;
}
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictIterator(server.db->keys, slot);
for (unsigned int i = 0; i < numkeys; i++) {
de = kvstoreDictIteratorNext(kvs_di);
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
}
kvstoreReleaseDictIterator(kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr, "slaves") || !strcasecmp(c->argv[1]->ptr, "replicas")) && c->argc == 3) {
/* CLUSTER REPLICAS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
int j;
/* Lookup the specified node in our table. */
if (!n) {
addReplyErrorFormat(c, "Unknown node %s", (char *)c->argv[2]->ptr);
return;
}
if (clusterNodeIsReplica(n)) {
addReplyError(c, "The specified node is not a master");
return;
}
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
addReplyArrayLen(c, clusterNodeNumReplicas(n));
for (j = 0; j < clusterNodeNumReplicas(n); j++) {
sds ni = clusterGenNodeDescription(c, clusterNodeGetReplica(n, j), shouldReturnTlsInfo());
addReplyBulkCString(c, ni);
sdsfree(ni);
}
} else if (!clusterCommandSpecial(c)) {
addReplySubcommandSyntaxError(c);
return;
}
}
/* Return the pointer to the cluster node that is able to serve the command.
* For the function to succeed the command should only target either:
*
* 1) A single key (even multiple times like RPOPLPUSH mylist mylist).
* 2) Multiple keys in the same hash slot, while the slot is stable (no
* resharding in progress).
*
* On success the function returns the node that is able to serve the request.
* If the node is not 'myself' a redirection must be performed. The kind of
* redirection is specified setting the integer passed by reference
* 'error_code', which will be set to CLUSTER_REDIR_ASK or
* CLUSTER_REDIR_MOVED.
*
* When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
*
* If the command fails NULL is returned, and the reason of the failure is
* provided via 'error_code', which will be set to:
*
* CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
* don't belong to the same hash slot.
*
* CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
* belonging to the same slot, but the slot is not stable (in migration or
* importing state, likely because a resharding is in progress).
*
* CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
* not bound to any node. In this case the cluster global state should be
* already "down" but it is fragile to rely on the update of the global state,
* so we also handle it here.
*
* CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
* down but the user attempts to execute a command that addresses one or more keys. */
clusterNode *
getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
clusterNode *myself = getMyClusterNode();
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, existing_keys = 0;
/* Allow any key to be set if a module disabled cluster redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) return myself;
/* Set error code optimistically for the base case. */
if (error_code) *error_code = CLUSTER_REDIR_NONE;
/* Modules can turn off Cluster redirection: this is useful
* when writing a module that implements a completely different
* distributed system. */
/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
if (cmd->proc == execCommand) {
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!c->flag.multi) return myself;
ms = &c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;