Skip to content

Commit d7683f3

Browse files
Merge pull request #1047 from Nordix/unsubscribe-handling
Unsubscribe handling in async
2 parents 7c44a9d + 7123b87 commit d7683f3

File tree

3 files changed

+97
-40
lines changed

3 files changed

+97
-40
lines changed

async.c

+85-36
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
148148
ac->sub.replies.tail = NULL;
149149
ac->sub.channels = channels;
150150
ac->sub.patterns = patterns;
151+
ac->sub.pending_unsubs = 0;
151152

152153
return ac;
153154
oom:
@@ -411,11 +412,11 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
411412
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
412413
redisContext *c = &(ac->c);
413414
dict *callbacks;
414-
redisCallback *cb;
415+
redisCallback *cb = NULL;
415416
dictEntry *de;
416417
int pvariant;
417418
char *stype;
418-
sds sname;
419+
sds sname = NULL;
419420

420421
/* Match reply with the expected format of a pushed message.
421422
* The type and number of elements (3 to 4) are specified at:
@@ -432,42 +433,43 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
432433
callbacks = ac->sub.channels;
433434

434435
/* Locate the right callback */
435-
assert(reply->element[1]->type == REDIS_REPLY_STRING);
436-
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
437-
if (sname == NULL)
438-
goto oom;
436+
if (reply->element[1]->type == REDIS_REPLY_STRING) {
437+
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
438+
if (sname == NULL) goto oom;
439439

440-
de = dictFind(callbacks,sname);
441-
if (de != NULL) {
442-
cb = dictGetEntryVal(de);
443-
444-
/* If this is an subscribe reply decrease pending counter. */
445-
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
446-
cb->pending_subs -= 1;
440+
if ((de = dictFind(callbacks,sname)) != NULL) {
441+
cb = dictGetEntryVal(de);
442+
memcpy(dstcb,cb,sizeof(*dstcb));
447443
}
444+
}
448445

449-
memcpy(dstcb,cb,sizeof(*dstcb));
450-
451-
/* If this is an unsubscribe message, remove it. */
452-
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
453-
if (cb->pending_subs == 0)
454-
dictDelete(callbacks,sname);
455-
456-
/* If this was the last unsubscribe message, revert to
457-
* non-subscribe mode. */
458-
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
459-
460-
/* Unset subscribed flag only when no pipelined pending subscribe. */
461-
if (reply->element[2]->integer == 0
462-
&& dictSize(ac->sub.channels) == 0
463-
&& dictSize(ac->sub.patterns) == 0) {
464-
c->flags &= ~REDIS_SUBSCRIBED;
465-
466-
/* Move ongoing regular command callbacks. */
467-
redisCallback cb;
468-
while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
469-
__redisPushCallback(&ac->replies,&cb);
470-
}
446+
/* If this is an subscribe reply decrease pending counter. */
447+
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
448+
assert(cb != NULL);
449+
cb->pending_subs -= 1;
450+
451+
} else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
452+
if (cb == NULL)
453+
ac->sub.pending_unsubs -= 1;
454+
else if (cb->pending_subs == 0)
455+
dictDelete(callbacks,sname);
456+
457+
/* If this was the last unsubscribe message, revert to
458+
* non-subscribe mode. */
459+
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
460+
461+
/* Unset subscribed flag only when no pipelined pending subscribe
462+
* or pending unsubscribe replies. */
463+
if (reply->element[2]->integer == 0
464+
&& dictSize(ac->sub.channels) == 0
465+
&& dictSize(ac->sub.patterns) == 0
466+
&& ac->sub.pending_unsubs == 0) {
467+
c->flags &= ~REDIS_SUBSCRIBED;
468+
469+
/* Move ongoing regular command callbacks. */
470+
redisCallback cb;
471+
while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
472+
__redisPushCallback(&ac->replies,&cb);
471473
}
472474
}
473475
}
@@ -540,7 +542,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
540542

541543
/* Even if the context is subscribed, pending regular
542544
* callbacks will get a reply before pub/sub messages arrive. */
543-
redisCallback cb = {NULL, NULL, 0, NULL};
545+
redisCallback cb = {NULL, NULL, 0, 0, NULL};
544546
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
545547
/*
546548
* A spontaneous reply in a not-subscribed context can be the error
@@ -757,6 +759,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
757759
redisContext *c = &(ac->c);
758760
redisCallback cb;
759761
struct dict *cbdict;
762+
dictIterator it;
760763
dictEntry *de;
761764
redisCallback *existcb;
762765
int pvariant, hasnext;
@@ -773,6 +776,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
773776
cb.fn = fn;
774777
cb.privdata = privdata;
775778
cb.pending_subs = 1;
779+
cb.unsubscribe_sent = 0;
776780

777781
/* Find out which command will be appended. */
778782
p = nextArgument(cmd,&cstr,&clen);
@@ -812,6 +816,51 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
812816
* subscribed to one or more channels or patterns. */
813817
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
814818

819+
if (pvariant)
820+
cbdict = ac->sub.patterns;
821+
else
822+
cbdict = ac->sub.channels;
823+
824+
if (hasnext) {
825+
/* Send an unsubscribe with specific channels/patterns.
826+
* Bookkeeping the number of expected replies */
827+
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
828+
sname = sdsnewlen(astr,alen);
829+
if (sname == NULL)
830+
goto oom;
831+
832+
de = dictFind(cbdict,sname);
833+
if (de != NULL) {
834+
existcb = dictGetEntryVal(de);
835+
if (existcb->unsubscribe_sent == 0)
836+
existcb->unsubscribe_sent = 1;
837+
else
838+
/* Already sent, reply to be ignored */
839+
ac->sub.pending_unsubs += 1;
840+
} else {
841+
/* Not subscribed to, reply to be ignored */
842+
ac->sub.pending_unsubs += 1;
843+
}
844+
sdsfree(sname);
845+
}
846+
} else {
847+
/* Send an unsubscribe without specific channels/patterns.
848+
* Bookkeeping the number of expected replies */
849+
int no_subs = 1;
850+
dictInitIterator(&it,cbdict);
851+
while ((de = dictNext(&it)) != NULL) {
852+
existcb = dictGetEntryVal(de);
853+
if (existcb->unsubscribe_sent == 0) {
854+
existcb->unsubscribe_sent = 1;
855+
no_subs = 0;
856+
}
857+
}
858+
/* Unsubscribing to all channels/patterns, where none is
859+
* subscribed to, results in a single reply to be ignored. */
860+
if (no_subs == 1)
861+
ac->sub.pending_unsubs += 1;
862+
}
863+
815864
/* (P)UNSUBSCRIBE does not have its own response: every channel or
816865
* pattern that is unsubscribed will receive a message. This means we
817866
* should not append a callback function for this command. */

async.h

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ typedef struct redisCallback {
4646
struct redisCallback *next; /* simple singly linked list */
4747
redisCallbackFn *fn;
4848
int pending_subs;
49+
int unsubscribe_sent;
4950
void *privdata;
5051
} redisCallback;
5152

@@ -105,6 +106,7 @@ typedef struct redisAsyncContext {
105106
redisCallbackList replies;
106107
struct dict *channels;
107108
struct dict *patterns;
109+
int pending_unsubs;
108110
} sub;
109111

110112
/* Any configured RESP3 PUSH handler */

test.c

+10-4
Original file line numberDiff line numberDiff line change
@@ -1729,10 +1729,14 @@ void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) {
17291729
strcmp(reply->element[2]->str,"Hello!") == 0);
17301730
state->checkpoint++;
17311731

1732-
/* Unsubscribe to channels, including a channel X which we don't subscribe to */
1732+
/* Unsubscribe to channels, including channel X & Z which we don't subscribe to */
17331733
redisAsyncCommand(ac,unexpected_cb,
17341734
(void*)"unsubscribe should not call unexpected_cb()",
1735-
"unsubscribe B X A");
1735+
"unsubscribe B X A A Z");
1736+
/* Unsubscribe to patterns, none which we subscribe to */
1737+
redisAsyncCommand(ac,unexpected_cb,
1738+
(void*)"punsubscribe should not call unexpected_cb()",
1739+
"punsubscribe");
17361740
/* Send a regular command after unsubscribing, then disconnect */
17371741
state->disconnect = 1;
17381742
redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
@@ -1767,8 +1771,10 @@ void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) {
17671771

17681772
/* Test handling of multiple channels
17691773
* - subscribe to channel A and B
1770-
* - a published message on A triggers an unsubscribe of channel B, X and A
1771-
* where channel X is not subscribed to.
1774+
* - a published message on A triggers an unsubscribe of channel B, X, A and Z
1775+
* where channel X and Z are not subscribed to.
1776+
* - the published message also triggers an unsubscribe to patterns. Since no
1777+
* pattern is subscribed to the responded pattern element type is NIL.
17721778
* - a command sent after unsubscribe triggers a disconnect */
17731779
static void test_pubsub_multiple_channels(struct config config) {
17741780
test("Subscribe to multiple channels: ");

0 commit comments

Comments
 (0)