Skip to content

Commit 58aacda

Browse files
authored
Handle array response in parallell with pubsub using RESP3 (#1014)
RESP3 allows sending commands in parallell with pubsub handling and these commands might get responded with a REDIS_REPLY_ARRAY. This conflicts with the pubsub response handling for RESP2 and results in a faulty state when using RESP3. Add functionality to keep track of PUSH/RESP3 support on the connection and only expect the message type REDIS_REPLY_PUSH as subscribe messages when once seen.
1 parent d338426 commit 58aacda

File tree

3 files changed

+24
-2
lines changed

3 files changed

+24
-2
lines changed

async.c

+4-1
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
420420
/* Match reply with the expected format of a pushed message.
421421
* The type and number of elements (3 to 4) are specified at:
422422
* https://redis.io/topics/pubsub#format-of-pushed-messages */
423-
if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) ||
423+
if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) ||
424424
reply->type == REDIS_REPLY_PUSH) {
425425
assert(reply->element[0]->type == REDIS_REPLY_STRING);
426426
stype = reply->element[0]->str;
@@ -525,6 +525,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
525525
break;
526526
}
527527

528+
/* Keep track of push message support for subscribe handling */
529+
if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;
530+
528531
/* Send any non-subscribe related PUSH messages to our PUSH handler
529532
* while allowing subscribe related PUSH messages to pass through.
530533
* This allows existing code to be backward compatible and work in

hiredis.h

+3
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ typedef long long ssize_t;
8080
/* Flag that is set when we should set SO_REUSEADDR before calling bind() */
8181
#define REDIS_REUSEADDR 0x80
8282

83+
/* Flag that is set when the async connection supports push replies. */
84+
#define REDIS_SUPPORTS_PUSH 0x100
85+
8386
/**
8487
* Flag that indicates the user does not want the context to
8588
* be automatically freed upon error

test.c

+17-1
Original file line numberDiff line numberDiff line change
@@ -1567,6 +1567,15 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
15671567
exit(1);
15681568
}
15691569

1570+
/* Expect a reply of type INTEGER */
1571+
void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
1572+
(void) ac;
1573+
redisReply *reply = r;
1574+
TestState *state = privdata;
1575+
assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
1576+
state->checkpoint++;
1577+
}
1578+
15701579
static void test_pubsub_handling_resp3(struct config config) {
15711580
test("Subscribe, handle published message and unsubscribe using RESP3: ");
15721581
/* Setup event dispatcher with a testcase timeout */
@@ -1594,13 +1603,20 @@ static void test_pubsub_handling_resp3(struct config config) {
15941603
TestState state = {.options = &options, .resp3 = 1};
15951604
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
15961605

1606+
/* Make sure non-subscribe commands are handled in RESP3 */
1607+
redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
1608+
redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
1609+
redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
1610+
/* Handle an array with 3 elements as a non-subscribe command */
1611+
redisAsyncCommand(ac,array_cb,&state,"LRANGE mylist 0 2");
1612+
15971613
/* Start event dispatching loop */
15981614
test_cond(event_base_dispatch(base) == 0);
15991615
event_free(timeout);
16001616
event_base_free(base);
16011617

16021618
/* Verify test checkpoints */
1603-
assert(state.checkpoint == 1);
1619+
assert(state.checkpoint == 5);
16041620
}
16051621
#endif
16061622

0 commit comments

Comments
 (0)