From 98e8e605e299fb5dfc548be86d1f162a390cda94 Mon Sep 17 00:00:00 2001 From: Jordan Kaye Date: Tue, 5 Jan 2016 15:37:23 -0600 Subject: [PATCH] Fixed busy subscription loop --- src/client.jl | 5 ----- src/commands.jl | 4 ++-- src/parser.jl | 10 +++++----- test/redis_tests.jl | 1 + 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/client.jl b/src/client.jl index c81ade3..6dec266 100644 --- a/src/client.jl +++ b/src/client.jl @@ -44,11 +44,6 @@ end function subscription_loop(conn::SubscriptionConnection, err_callback::Function) while is_connected(conn) try - # Probably could do something better here, but we can't block - # forever or else subsequent subscribe commands on the same - # socket will block until a message is received - sleep(.1) - nb_available(conn.socket) > 0 || continue l = getline(conn.socket) reply = parseline(l, conn.socket) message = SubscriptionMessage(reply) diff --git a/src/commands.jl b/src/commands.jl index 285dbcb..0b0d343 100644 --- a/src/commands.jl +++ b/src/commands.jl @@ -251,7 +251,7 @@ end @redisfunction "publish" Integer channel message function _subscribe(conn::SubscriptionConnection, channels::Array) - execute_command(conn, unshift!(channels, "subscribe")) + execute_command_without_reply(conn, unshift!(channels, "subscribe")) end function subscribe(conn::SubscriptionConnection, channel::AbstractString, callback::Function) @@ -274,7 +274,7 @@ function unsubscribe(conn::SubscriptionConnection, channels...) end function _psubscribe(conn::SubscriptionConnection, patterns::Array) - execute_command(conn, unshift!(patterns, "psubscribe")) + execute_command_without_reply(conn, unshift!(patterns, "psubscribe")) end function psubscribe(conn::SubscriptionConnection, pattern::AbstractString, callback::Function) diff --git a/src/parser.jl b/src/parser.jl index 336e51c..63dd006 100644 --- a/src/parser.jl +++ b/src/parser.jl @@ -76,18 +76,18 @@ function pack_command(command) packed_command end - - -function execute_command(conn::RedisConnectionBase, command) +function execute_command_without_reply(conn::RedisConnectionBase, command) is_connected(conn) || throw(ConnectionException("Socket is disconnected")) send_command(conn, pack_command(command)) +end + +function execute_command(conn::RedisConnectionBase, command) + execute_command_without_reply(conn, command) l = getline(conn.socket) reply = parseline(l, conn.socket) return reply end - - baremodule SubscriptionMessageType const Message = 0 const Pmessage = 1 diff --git a/test/redis_tests.jl b/test/redis_tests.jl index a013c32..b69979d 100644 --- a/test/redis_tests.jl +++ b/test/redis_tests.jl @@ -45,6 +45,7 @@ subs = open_subscription(conn, g) x = Any[] f(y) = push!(x, y) subscribe(subs, "channel", f) +subscribe(subs, "duplicate", f) @test publish(conn, "channel", "hello, world!") == 1 sleep(2) @test x == ["hello, world!"]