Skip to content

Commit

Permalink
Merge pull request #67 from launchdarkly/eb/ch19217/remove-celluloid
Browse files Browse the repository at this point in the history
reimplement SSE client without Celluloid [1 of 2]
  • Loading branch information
eli-darkly authored Jun 22, 2018
2 parents d711aea + f071b9f commit 502e99b
Show file tree
Hide file tree
Showing 11 changed files with 641 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ workflows:
version: 2
test:
jobs:
- test-misc-rubies
# - test-misc-rubies # none of these older Ruby versions are supported on this branch
- test-2.2
- test-2.3
- test-2.4
Expand Down
10 changes: 2 additions & 8 deletions ldclient-rb.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency "net-http-persistent", "~> 2.9"
spec.add_runtime_dependency "concurrent-ruby", "~> 1.0.4"
spec.add_runtime_dependency "hashdiff", "~> 0.2"
spec.add_runtime_dependency "ld-celluloid-eventsource", "~> 0.11.0"
spec.add_runtime_dependency "celluloid", "~> 0.18.0.pre" # transitive dep; specified here for more control

if RUBY_VERSION >= "2.2.2"
spec.add_runtime_dependency "nio4r", "< 3" # for maximum ruby version compatibility.
else
spec.add_runtime_dependency "nio4r", "~> 1.1" # for maximum ruby version compatibility.
end
spec.add_runtime_dependency "http_tools", '~> 0.4.5'
spec.add_runtime_dependency "socketry", "~> 0.5.1"
end
39 changes: 16 additions & 23 deletions lib/ldclient-rb/stream.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require "concurrent/atomics"
require "json"
require "celluloid/eventsource"
require "sse_client"

module LaunchDarkly
PUT = :put
Expand Down Expand Up @@ -36,18 +36,18 @@ def start

@config.logger.info { "[LDClient] Initializing stream connection" }

headers =
{
headers = {
'Authorization' => @sdk_key,
'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION
}
opts = {:headers => headers, :with_credentials => true, :proxy => @config.proxy, :read_timeout => READ_TIMEOUT_SECONDS}
@es = Celluloid::EventSource.new(@config.stream_uri + "/all", opts) do |conn|
conn.on(PUT) { |message| process_message(message, PUT) }
conn.on(PATCH) { |message| process_message(message, PATCH) }
conn.on(DELETE) { |message| process_message(message, DELETE) }
conn.on(INDIRECT_PUT) { |message| process_message(message, INDIRECT_PUT) }
conn.on(INDIRECT_PATCH) { |message| process_message(message, INDIRECT_PATCH) }
opts = {
headers: headers,
proxy: @config.proxy,
read_timeout: READ_TIMEOUT_SECONDS,
logger: @config.logger
}
@es = SSE::SSEClient.new(@config.stream_uri + "/all", opts) do |conn|
conn.on_event { |event| process_message(event, event.type) }
conn.on_error { |err|
status = err[:status_code]
message = Util.http_error_message(status, "streaming connection", "will retry")
Expand All @@ -69,13 +69,6 @@ def stop
end
end

def stop
if @stopped.make_true
@es.close
@config.logger.info { "[LDClient] Stream connection stopped" }
end
end

private

def process_message(message, method)
Expand All @@ -90,20 +83,20 @@ def process_message(message, method)
@config.logger.info { "[LDClient] Stream initialized" }
@ready.set
elsif method == PATCH
message = JSON.parse(message.data, symbolize_names: true)
data = JSON.parse(message.data, symbolize_names: true)
for kind in [FEATURES, SEGMENTS]
key = key_for_path(kind, message[:path])
key = key_for_path(kind, data[:path])
if key
@feature_store.upsert(kind, message[:data])
@feature_store.upsert(kind, data[:data])
break
end
end
elsif method == DELETE
message = JSON.parse(message.data, symbolize_names: true)
data = JSON.parse(message.data, symbolize_names: true)
for kind in [FEATURES, SEGMENTS]
key = key_for_path(kind, message[:path])
key = key_for_path(kind, data[:path])
if key
@feature_store.delete(kind, key, message[:version])
@feature_store.delete(kind, key, data[:version])
break
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/ldclient-rb/user_filter.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "json"
require "set"

module LaunchDarkly
class UserFilter
Expand Down
4 changes: 4 additions & 0 deletions lib/sse_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
require "sse_client/streaming_http"
require "sse_client/sse_events"
require "sse_client/backoff"
require "sse_client/sse_client"
38 changes: 38 additions & 0 deletions lib/sse_client/backoff.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@

module SSE
#
# A simple backoff algorithm that can be reset at any time, or reset itself after a given
# interval has passed without errors.
#
class Backoff
def initialize(base_interval, max_interval, auto_reset_interval = 60)
@base_interval = base_interval
@max_interval = max_interval
@auto_reset_interval = auto_reset_interval
@attempts = 0
@last_good_time = nil
@jitter_rand = Random.new
end

attr_accessor :base_interval

def next_interval
if !@last_good_time.nil? && (Time.now.to_i - @last_good_time) >= @auto_reset_interval
@attempts = 0
end
@last_good_time = nil
if @attempts == 0
@attempts += 1
return 0
end
@last_good_time = nil
target = ([@base_interval * (2 ** @attempts), @max_interval].min).to_f
@attempts += 1
(target / 2) + @jitter_rand.rand(target / 2)
end

def mark_success
@last_good_time = Time.now.to_i if @last_good_time.nil?
end
end
end
161 changes: 161 additions & 0 deletions lib/sse_client/sse_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
require "concurrent/atomics"
require "logger"
require "thread"
require "uri"

module SSE
#
# A lightweight Server-Sent Events implementation, relying on two gems: socketry for sockets with
# read timeouts, and http_tools for HTTP response parsing. The overall logic is based on
# [https://github.com/Tonkpils/celluloid-eventsource].
#
class SSEClient
DEFAULT_CONNECT_TIMEOUT = 10
DEFAULT_READ_TIMEOUT = 300
DEFAULT_RECONNECT_TIME = 1
MAX_RECONNECT_TIME = 30

def initialize(uri, options = {})
@uri = URI(uri)
@stopped = Concurrent::AtomicBoolean.new(false)

@headers = options[:headers].clone || {}
@connect_timeout = options[:connect_timeout] || DEFAULT_CONNECT_TIMEOUT
@read_timeout = options[:read_timeout] || DEFAULT_READ_TIMEOUT
@logger = options[:logger] || default_logger

proxy = ENV['HTTP_PROXY'] || ENV['http_proxy'] || options[:proxy]
if proxy
proxyUri = URI(proxy)
if proxyUri.scheme == 'http' || proxyUri.scheme == 'https'
@proxy = proxyUri
end
end

reconnect_time = options[:reconnect_time] || DEFAULT_RECONNECT_TIME
@backoff = Backoff.new(reconnect_time, MAX_RECONNECT_TIME)

@on = { event: ->(_) {}, error: ->(_) {} }
@last_id = nil

yield self if block_given?

@worker = Thread.new do
run_stream
end
end

def on(event_name, &action)
@on[event_name.to_sym] = action
end

def on_event(&action)
@on[:event] = action
end

def on_error(&action)
@on[:error] = action
end

def close
if @stopped.make_true
@worker.raise ShutdownSignal.new
end
end

private

def default_logger
log = ::Logger.new($stdout)
log.level = ::Logger::WARN
log
end

def run_stream
while !@stopped.value
cxn = nil
begin
cxn = connect
read_stream(cxn)
rescue ShutdownSignal
return
rescue StandardError => e
@logger.error { "Unexpected error from event source: #{e.inspect}" }
@logger.debug { "Exception trace: #{e.backtrace}" }
end
cxn.close if !cxn.nil?
end
end

# Try to establish a streaming connection. Returns the StreamingHTTPConnection object if successful.
def connect
loop do
interval = @backoff.next_interval
if interval > 0
@logger.warn { "Will retry connection after #{'%.3f' % interval} seconds" }
sleep(interval)
end
begin
cxn = open_connection(build_headers)
if cxn.status != 200
body = cxn.read_all # grab the whole response body in case it has error details
cxn.close
@on[:error].call({status_code: cxn.status, body: body})
elsif cxn.headers["content-type"] && cxn.headers["content-type"].start_with?("text/event-stream")
return cxn # we're good to proceed
end
@logger.error { "Event source returned unexpected content type '#{cxn.headers["content-type"]}'" }
rescue StandardError => e
@logger.error { "Unexpected error from event source: #{e.inspect}" }
@logger.debug { "Exception trace: #{e.backtrace}" }
cxn.close if !cxn.nil?
end
# if unsuccessful, continue the loop to connect again
end
end

# Just calls the StreamingHTTPConnection constructor - factored out for test purposes
def open_connection(headers)
StreamingHTTPConnection.new(@uri, @proxy, headers, @connect_timeout, @read_timeout)
end

# Pipe the output of the StreamingHTTPConnection into the EventParser, and dispatch events as
# they arrive.
def read_stream(cxn)
event_parser = EventParser.new(cxn.read_lines)
event_parser.items.each do |item|
case item
when SSEEvent
dispatch_event(item)
when SSESetRetryInterval
@backoff.base_interval = event.milliseconds.t-Of / 1000
end
end
end

def dispatch_event(event)
@last_id = event.id

# Tell the Backoff object that as of the current time, we have succeeded in getting some data. It
# uses that information so it can automatically reset itself if enough time passes between failures.
@backoff.mark_success

# Pass the event to the caller
@on[:event].call(event)
end

def build_headers
h = {
'Accept' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'Host' => @uri.host
}
h['Last-Event-Id'] = @last_id if !@last_id.nil?
h.merge(@headers)
end
end

# Custom exception that we use to tell the worker thread to stop
class ShutdownSignal < StandardError
end
end
67 changes: 67 additions & 0 deletions lib/sse_client/sse_events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

module SSE
# Server-Sent Event type used by SSEClient and EventParser.
SSEEvent = Struct.new(:type, :data, :id)

SSESetRetryInterval = Struct.new(:milliseconds)

#
# Accepts lines of text via an iterator, and parses them into SSE messages.
#
class EventParser
def initialize(lines)
@lines = lines
reset_buffers
end

# Generator that parses the input interator and returns instances of SSEEvent or SSERetryInterval.
def items
Enumerator.new do |gen|
@lines.each do |line|
line.chomp!
if line.empty?
event = maybe_create_event
reset_buffers
gen.yield event if !event.nil?
else
case line
when /^(\w+): ?(.*)$/
item = process_field($1, $2)
gen.yield item if !item.nil?
end
end
end
end
end

private

def reset_buffers
@id = nil
@type = nil
@data = ""
end

def process_field(name, value)
case name
when "event"
@type = value.to_sym
when "data"
@data << "\n" if !@data.empty?
@data << value
when "id"
@id = value
when "retry"
if /^(?<num>\d+)$/ =~ value
return SSESetRetryInterval.new(num.to_i)
end
end
nil
end

def maybe_create_event
return nil if @data.empty?
SSEEvent.new(@type || :message, @data, @id)
end
end
end
Loading

0 comments on commit 502e99b

Please sign in to comment.