-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpubsub_core_async.repl
97 lines (76 loc) · 2.86 KB
/
pubsub_core_async.repl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
;; An example of how to use Redis pub/sub with core.async.
(require '[rad.api :as rad])
;; Get core.async (presuming Clojure 1.12+).
(require '[clojure.repl.deps :as deps])
(binding [*repl* true]
(deps/add-lib 'org.clojure/core.async))
(require '[clojure.core.async :as async])
;; Start Rad.
(def redis (rad/client :decode rad/bytes->str)) #_(redis)
;; Make a core.async channel to put messages into.
(def chan (async/chan 256))
;; Subscribe to Redis channels "channel-1" and "channel-2".
;;
;; Attach a callback function that Rad calls every time Redis sends it a
;; message published into either Redis pub/sub channel. The callback function
;; puts every message into the core.async channel.
;;
;; (Rad executes callbacks in a virtual thread, so doing a blocking put here
;; should not be a problem.)
@(redis ^{:cb (fn [msg] (async/>!! chan msg))}
[:SUBSCRIBE "channel-1" "channel-2"])
;; Publish a message.
@(redis [:PUBLISH "channel-1" "Hello, world!"])
;; The first message that appears in the core.async channel is the subscription
;; confirmation message Redis sends every time a client subscribes to a
;; channel.
(async/<!! chan)
;; The second message is the subscription confirmation message for channel-2.
(async/<!! chan)
;; The third message is the message we published.
(async/<!! chan)
;; Make helper functions.
(defn message?
"Return true if the Redis push event is a pub/sub message."
[push-event]
(= "message" (:type push-event)))
(defn channel-name
"Given a Redis push event, return the name of the channel the message was
published into."
[push-event]
(-> push-event :data (nth 0)))
(defn payload
"Given a Redis push event, return the message payload."
[push-event]
(-> push-event :data peek))
;; Make a pub that partitions Redis pub/sub messages by Redis channel name.
(def pub (async/pub chan channel-name))
;; Make a core.async channels for messages published into channel-1 and
;; channel-2.
(def channel-1 (async/chan 1 (comp (filter message?) (map payload))))
(def channel-2 (async/chan 1 (comp (filter message?) (map payload))))
;; Subscribe to messages published to "channel-1" and "channel-2" Redis
;; channels.
(async/sub pub "channel-1" channel-1)
(async/sub pub "channel-2" channel-2)
;; Publish a message to both channels.
@(redis [:PUBLISH "channel-1" "C1"])
@(redis [:PUBLISH "channel-2" "C2"])
;; A message appears in both channels.
(async/<!! channel-1)
(async/<!! channel-2)
;; A helper function for taking from a core.async channel with timeout.
(defn try-take!
[chan timeout-ms]
(let [timeout-chan (async/timeout timeout-ms)
[val port] (async/alts!! [chan timeout-chan])]
(if (identical? port timeout-chan)
::timeout
val)))
;; Neither channel has any more messages.
(try-take! channel-1 1000)
(try-take! channel-2 1000)
;; Close all channels.
(async/close! chan)
(async/close! channel-1)
(async/close! channel-2)