Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resp3 oob push support #841

Merged
merged 14 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
/*.a
/*.pc
*.dSYM
tags
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ before_script:

addons:
apt:
sources:
- sourceline: 'ppa:chris-lea/redis-server'
packages:
- libc6-dbg
- libc6-dev
Expand All @@ -35,6 +37,7 @@ addons:
- libssl-dev
- libssl-dev:i386
- valgrind
- redis

env:
- BITS="32"
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o
SSL_OBJ=ssl.o
EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib
EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push
ifeq ($(USE_SSL),1)
EXAMPLES+=hiredis-example-ssl hiredis-example-libevent-ssl
endif
Expand Down Expand Up @@ -161,6 +161,7 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)
hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS)


ifndef AE_DIR
hiredis-example-ae:
@echo "Please specify AE_DIR (e.g. <redis repository>/src)"
Expand Down Expand Up @@ -195,6 +196,9 @@ endif
hiredis-example: examples/example.c $(STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)

hiredis-example-push: examples/example-push.c $(STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)

examples: $(EXAMPLES)

TEST_LIBS = $(STLIBNAME)
Expand Down
78 changes: 71 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[![Build Status](https://travis-ci.org/redis/hiredis.png)](https://travis-ci.org/redis/hiredis)

**This Readme reflects the latest changed in the master branch. See [v0.13.3](https://github.com/redis/hiredis/tree/v0.13.3) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).**
**This Readme reflects the latest changed in the master branch. See [v0.14.1](https://github.com/redis/hiredis/tree/v0.14.1) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).**

# HIREDIS

Expand Down Expand Up @@ -500,12 +500,75 @@ if (redisInitiateSSLWithContext(c, ssl) != REDIS_OK) {
}
```

## RESP3 PUSH replies
Redis 6.0 introduced PUSH replies with the reply-type `>`. These messages are generated spontaneously and can arrive at any time, so must be handled using callbacks.

### Default behavior
Hiredis installs handlers on `redisContext` and `redisAsyncContext` by default, which will intercept and free any PUSH replies detected. This means existing code will work as-is after upgrading to Redis 6 and switching to `RESP3`.

### Custom PUSH handler prototypes
The callback prototypes differ between `redisContext` and `redisAsyncContext`.

#### redisContext
```c
void my_push_handler(void *privdata, void *reply) {
/* Handle the reply */

/* Note: We need to free the reply in our custom handler for
blocking contexts. This lets us keep the reply if
we want. */
freeReplyObject(reply);
}
```

#### redisAsyncContext
```c
void my_async_push_handler(redisAsyncContext *ac, void *reply) {
/* Handle the reply */

/* Note: Because async hiredis always frees replies, you should
not call freeReplyObject in an async push callback. */
}
```

### Installing a custom handler
There are two ways to set your own PUSH handlers.

1. Set `push_cb` or `async_push_cb` in the `redisOptions` struct and connect with `redisConnectWithOptions` or `redisAsyncConnectWithOptions`.
```c
redisOptions = {0};
REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
options->push_cb = my_push_handler;
redisContext *context = redisConnectWithOptions(&options);
```
2. Call `redisSetPushCallback` or `redisAsyncSetPushCallback` on a connected context.
```c
redisContext *context = redisConnect("127.0.0.1", 6379);
redisSetPushCallback(context, my_push_handler);
```

_Note `redisSetPushCallback` and `redisAsyncSetPushCallback` both return any currently configured handler, making it easy to override and then return to the old value._

### Specifying no handler
If you have a unique use-case where you don't want hiredis to automatically intercept and free PUSH replies, you will want to configure no handler at all. This can be done in two ways.
1. Set the `REDIS_OPT_NO_PUSH_AUTOFREE` flag in `redisOptions` and leave the callback function pointer `NULL`.
```c
redisOptions = {0};
REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
options->options |= REDIS_OPT_NO_PUSH_AUTOFREE;
redisContext *context = redisConnectWithOptions(&options);
```
3. Call `redisSetPushCallback` with `NULL` once connected.
```c
redisContext *context = redisConnect("127.0.0.1", 6379);
redisSetPushCallback(context, NULL);
```

_Note: With no handler configured, calls to `redisCommand` may generate more than one reply, so this strategy is only applicable when there's some kind of blocking`redisGetReply()` loop (e.g. `MONITOR` or `SUBSCRIBE` workloads)._

## Allocator injection

Hiredis uses a pass-thru structure of function pointers defined in
[alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that conttain
the currently configured allocation and deallocation functions. By default they
just point to libc (`malloc`, `calloc`, `realloc`, etc).
Hiredis uses a pass-thru structure of function pointers defined in [alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that contain the currently configured allocation and deallocation functions. By default they just point to libc (`malloc`, `calloc`, `realloc`, etc).

### Overriding

Expand All @@ -532,5 +595,6 @@ hiredisResetAllocators();

## AUTHORS

Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
Pieter Noordhuis (pcnoordhuis at gmail) and is released under the BSD license.
Hiredis was written by Salvatore Sanfilippo (antirez at gmail),
Pieter Noordhuis (pcnoordhuis at gmail), and Michael Grunder
(michael dot grunder at gmail) and is released under the BSD license.
64 changes: 61 additions & 3 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,26 @@ redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
redisContext *c;
redisAsyncContext *ac;

/* Clear any erroneously set sync callback and flag that we don't want to
* use freeReplyObject by default. */
myOptions.push_cb = NULL;
myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;

myOptions.options |= REDIS_OPT_NONBLOCK;
c = redisConnectWithOptions(&myOptions);
if (c == NULL) {
return NULL;
}

ac = redisAsyncInitialize(c);
if (ac == NULL) {
redisFree(c);
return NULL;
}

/* Set any configured async push handler */
redisAsyncSetPushCallback(ac, myOptions.async_push_cb);

__redisAsyncCopyError(ac);
return ac;
}
Expand Down Expand Up @@ -279,6 +289,14 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe
}
}

static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
if (ac->push_cb != NULL) {
ac->c.flags |= REDIS_IN_CALLBACK;
ac->push_cb(ac, reply);
ac->c.flags &= ~REDIS_IN_CALLBACK;
}
}

/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
Expand All @@ -294,7 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);

/* Run subscription callbacks callbacks with NULL reply */
/* Run subscription callbacks with NULL reply */
if (ac->sub.channels) {
it = dictGetIterator(ac->sub.channels);
if (it != NULL) {
Expand Down Expand Up @@ -459,6 +477,30 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
return REDIS_ERR;
}

#define redisIsSpontaneousPushReply(r) \
(redisIsPushReply(r) && !redisIsSubscribeReply(r))

static int redisIsSubscribeReply(redisReply *reply) {
char *str;
size_t len, off;

/* We will always have at least one string with the subscribe/message type */
if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
reply->element[0]->len < sizeof("message") - 1)
{
return 0;
}

/* Get the string/len moving past 'p' if needed */
off = tolower(reply->element[0]->str[0]) == 'p';
str = reply->element[0]->str + off;
len = reply->element[0]->len - off;

return !strncasecmp(str, "subscribe", len) ||
!strncasecmp(str, "message", len);

}

void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL};
Expand All @@ -485,8 +527,18 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
break;
}

/* Even if the context is subscribed, pending regular callbacks will
* get a reply before pub/sub messages arrive. */
/* Send any non-subscribe related PUSH messages to our PUSH handler
* while allowing subscribe related PUSH messages to pass through.
* This allows existing code to be backward compatible and work in
* either RESP2 or RESP3 mode. */
if (redisIsSpontaneousPushReply(reply)) {
__redisRunPushCallback(ac, reply);
c->reader->fn->freeObject(reply);
continue;
}

/* Even if the context is subscribed, pending regular
* callbacks will get a reply before pub/sub messages arrive. */
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/*
* A spontaneous reply in a not-subscribed context can be the error
Expand Down Expand Up @@ -809,6 +861,12 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
return status;
}

redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
redisAsyncPushFn *old = ac->push_cb;
ac->push_cb = fn;
return old;
}

int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
if (!ac->c.timeout) {
ac->c.timeout = hi_calloc(1, sizeof(tv));
Expand Down
4 changes: 4 additions & 0 deletions async.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ typedef struct redisAsyncContext {
struct dict *channels;
struct dict *patterns;
} sub;

/* Any configured RESP3 PUSH handler */
redisAsyncPushFn *push_cb;
} redisAsyncContext;

/* Functions that proxy to hiredis */
Expand All @@ -118,6 +121,7 @@ redisAsyncContext *redisAsyncConnectUnix(const char *path);
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);

redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn);
int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv);
void redisAsyncDisconnect(redisAsyncContext *ac);
void redisAsyncFree(redisAsyncContext *ac);
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ ENDIF()

ADD_EXECUTABLE(example example.c)
TARGET_LINK_LIBRARIES(example hiredis)

ADD_EXECUTABLE(example-push example-push.c)
TARGET_LINK_LIBRARIES(example-push hiredis)
Loading