Skip to content

Commit 49d9824

Browse files
committed
Handle array response in parallell with pubsub using RESP3
RESP3 allows sending commands in parallell with pubsub handling and these commands might get responded with a REDIS_REPLY_ARRAY, like `SORT`. This conflicts with the pubsub response handling for RESP2, and triggers asserts when attempting to find the subscribe callback. Add functionality to keep track of PUSH/RESP3 support on the connection and only expect the message type REDIS_REPLY_PUSH as subscribe messages when once seen.
1 parent f458d1a commit 49d9824

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

async.c

+5-1
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
419419

420420
/* Custom reply functions are not supported for pub/sub. This will fail
421421
* very hard when they are used... */
422-
if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
422+
if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH)) ||
423+
reply->type == REDIS_REPLY_PUSH) {
423424
assert(reply->elements >= 2);
424425
assert(reply->element[0]->type == REDIS_REPLY_STRING);
425426
stype = reply->element[0]->str;
@@ -524,6 +525,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
524525
break;
525526
}
526527

528+
/* Keep track of push message support for subscribe handling */
529+
if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;
530+
527531
/* Send any non-subscribe related PUSH messages to our PUSH handler
528532
* while allowing subscribe related PUSH messages to pass through.
529533
* This allows existing code to be backward compatible and work in

hiredis.h

+3
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ typedef long long ssize_t;
8080
/* Flag that is set when we should set SO_REUSEADDR before calling bind() */
8181
#define REDIS_REUSEADDR 0x80
8282

83+
/* Flag that is set when the async connection supports push replies. */
84+
#define REDIS_SUPPORTS_PUSH 0x100
85+
8386
/**
8487
* Flag that indicates the user does not want the context to
8588
* be automatically freed upon error

test.c

+23-1
Original file line numberDiff line numberDiff line change
@@ -1555,6 +1555,24 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
15551555
exit(1);
15561556
}
15571557

1558+
/* Expect a reply with type NIL */
1559+
void nil_cb(redisAsyncContext *ac, void *r, void *privdata) {
1560+
(void) ac;
1561+
redisReply *reply = r;
1562+
TestState *state = privdata;
1563+
assert(reply != NULL && reply->type == REDIS_REPLY_NIL);
1564+
state->checkpoint++;
1565+
}
1566+
1567+
/* Expect a reply with type ARRAY */
1568+
void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
1569+
(void) ac;
1570+
redisReply *reply = r;
1571+
TestState *state = privdata;
1572+
assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
1573+
state->checkpoint++;
1574+
}
1575+
15581576
static void test_pubsub_handling_resp3(struct config config) {
15591577
test("Subscribe, handle published message and unsubscribe using RESP3: ");
15601578
/* Setup event dispatcher with a testcase timeout */
@@ -1582,13 +1600,17 @@ static void test_pubsub_handling_resp3(struct config config) {
15821600
TestState state = {.options = &options, .resp3 = 1};
15831601
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
15841602

1603+
/* Make sure non-subscribe commands works in RESP3 */
1604+
redisAsyncCommand(ac,nil_cb,&state,"GET nonexisting");
1605+
redisAsyncCommand(ac,array_cb,&state,"SORT nonexisting");
1606+
15851607
/* Start event dispatching loop */
15861608
test_cond(event_base_dispatch(base) == 0);
15871609
event_free(timeout);
15881610
event_base_free(base);
15891611

15901612
/* Verify test checkpoints */
1591-
assert(state.checkpoint == 1);
1613+
assert(state.checkpoint == 3);
15921614
}
15931615
#endif
15941616

0 commit comments

Comments
 (0)