diff --git a/ldclient-rb.gemspec b/ldclient-rb.gemspec index 302c8d7d..ee80a8e0 100644 --- a/ldclient-rb.gemspec +++ b/ldclient-rb.gemspec @@ -30,5 +30,6 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "net-http-persistent", "~> 2.9" spec.add_runtime_dependency "concurrent-ruby", "~> 1.0.0" spec.add_runtime_dependency "hashdiff", "~> 0.2" - spec.add_runtime_dependency "ld-celluloid-eventsource", "~> 0.4" + spec.add_runtime_dependency "ld-celluloid-eventsource", "~> 0.4" + spec.add_runtime_dependency "waitutil", "0.2" end diff --git a/lib/ldclient-rb.rb b/lib/ldclient-rb.rb index 1c585f5a..549ec43f 100644 --- a/lib/ldclient-rb.rb +++ b/lib/ldclient-rb.rb @@ -1,7 +1,11 @@ require "ldclient-rb/version" -require "ldclient-rb/settings" +require "ldclient-rb/evaluation" require "ldclient-rb/ldclient" -require "ldclient-rb/store" +require "ldclient-rb/cache_store" require "ldclient-rb/config" require "ldclient-rb/newrelic" require "ldclient-rb/stream" +require "ldclient-rb/polling" +require "ldclient-rb/events" +require "ldclient-rb/feature_store" +require "ldclient-rb/requestor" diff --git a/lib/ldclient-rb/store.rb b/lib/ldclient-rb/cache_store.rb similarity index 100% rename from lib/ldclient-rb/store.rb rename to lib/ldclient-rb/cache_store.rb diff --git a/lib/ldclient-rb/config.rb b/lib/ldclient-rb/config.rb index edcde25d..074bda6b 100644 --- a/lib/ldclient-rb/config.rb +++ b/lib/ldclient-rb/config.rb @@ -31,9 +31,14 @@ class Config # connections in seconds. # @option opts [Float] :connect_timeout (2) The connect timeout for network # connections in seconds. - # @option opts [Object] :store A cache store for the Faraday HTTP caching + # @option opts [Object] :cache_store A cache store for the Faraday HTTP caching # library. Defaults to the Rails cache in a Rails environment, or a # thread-safe in-memory store otherwise. + # @option opts [Boolean] :offline (false) Whether the client should be initialized in + # offline mode. In offline mode, default values are returned for all flags and no + # remote network requests are made. + # @option opts [Float] :poll_interval (30) The number of seconds between polls for flag updates + # if streaming is off. # # @return [type] [description] def initialize(opts = {}) @@ -42,14 +47,14 @@ def initialize(opts = {}) @events_uri = (opts[:events_uri] || Config.default_events_uri).chomp("/") @capacity = opts[:capacity] || Config.default_capacity @logger = opts[:logger] || Config.default_logger - @store = opts[:store] || Config.default_store + @cache_store = opts[:cache_store] || Config.default_cache_store @flush_interval = opts[:flush_interval] || Config.default_flush_interval @connect_timeout = opts[:connect_timeout] || Config.default_connect_timeout @read_timeout = opts[:read_timeout] || Config.default_read_timeout @feature_store = opts[:feature_store] || Config.default_feature_store @stream = opts.has_key?(:stream) ? opts[:stream] : Config.default_stream - @log_timings = opts.has_key?(:log_timings) ? opts[:log_timings] : Config.default_log_timings - @debug_stream = opts.has_key?(:debug_stream) ? opts[:debug_stream] : Config.default_debug_stream + @offline = opts.has_key?(:offline) ? opts[:offline] : Config.default_offline + @poll_interval = opts.has_key?(:poll_interval) && opts[:poll_interval] > 1 ? opts[:poll_interval] : Config.default_poll_interval end # @@ -79,13 +84,9 @@ def stream? @stream end - # - # Whether we should debug streaming mode. If set, the client will fetch features via polling - # and compare the retrieved feature with the value in the feature store - # - # @return [Boolean] True if we should debug streaming mode - def debug_stream? - @debug_stream + # TODO docs + def offline? + @offline end # @@ -95,6 +96,11 @@ def debug_stream? # @return [Float] The configured number of seconds between flushes of the event buffer. attr_reader :flush_interval + # + # The number of seconds to wait before polling for feature flag updates. This option has no + # effect unless streaming is disabled + attr_reader :poll_interval + # # The configured logger for the LaunchDarkly client. The client library uses the log to # print warning and error messages. @@ -117,7 +123,7 @@ def debug_stream? # 'read' and 'write' requests. # # @return [Object] The configured store for the Faraday HTTP caching library. - attr_reader :store + attr_reader :cache_store # # The read timeout for network connections in seconds. @@ -132,16 +138,7 @@ def debug_stream? attr_reader :connect_timeout # - # Whether timing information should be logged. If it is logged, it will be logged to the DEBUG - # level on the configured logger. This can be very verbose. - # - # @return [Boolean] True if timing information should be logged. - def log_timings? - @log_timings - end - - # - # TODO docs + # A store for feature flag configuration rules. # attr_reader :feature_store @@ -170,7 +167,7 @@ def self.default_events_uri "https://events.launchdarkly.com" end - def self.default_store + def self.default_cache_store defined?(Rails) && Rails.respond_to?(:cache) ? Rails.cache : ThreadSafeMemoryStore.new end @@ -190,20 +187,20 @@ def self.default_logger defined?(Rails) && Rails.respond_to?(:logger) ? Rails.logger : ::Logger.new($stdout) end - def self.default_log_timings - false - end - def self.default_stream true end def self.default_feature_store - nil + InMemoryFeatureStore.new end - def self.default_debug_stream + def self.default_offline false end + + def self.default_poll_interval + 1 + end end end diff --git a/lib/ldclient-rb/evaluation.rb b/lib/ldclient-rb/evaluation.rb new file mode 100644 index 00000000..03b1ee3d --- /dev/null +++ b/lib/ldclient-rb/evaluation.rb @@ -0,0 +1,265 @@ +require "date" + +module LaunchDarkly + + module Evaluation + BUILTINS = [:key, :ip, :country, :email, :firstName, :lastName, :avatar, :name, :anonymous] + + OPERATORS = { + in: + lambda do |a, b| + a == b + end, + endsWith: + lambda do |a, b| + (a.is_a? String) && (a.end_with? b) + end, + startsWith: + lambda do |a, b| + (a.is_a? String) && (a.start_with? b) + end, + matches: + lambda do |a, b| + (b.is_a? String) && !(Regexp.new b).match(a).nil? + end, + contains: + lambda do |a, b| + (a.is_a? String) && (a.include? b) + end, + lessThan: + lambda do |a, b| + (a.is_a? Numeric) && (a < b) + end, + lessThanOrEqual: + lambda do |a, b| + (a.is_a? Numeric) && (a <= b) + end, + greaterThan: + lambda do |a, b| + (a.is_a? Numeric) && (a > b) + end, + greaterThanOrEqual: + lambda do |a, b| + (a.is_a? Numeric) && (a >= b) + end, + before: + lambda do |a, b| + begin + if a.is_a? String + a = DateTime.rfc3339(a).strftime('%Q').to_i + end + if b.is_a? String + b = DateTime.rfc3339(b).strftime('%Q').to_i + end + (a.is_a? Numeric) ? a < b : false + rescue => e + false + end + end, + after: + lambda do |a, b| + begin + if a.is_a? String + a = DateTime.rfc3339(a).strftime('%Q').to_i + end + if b.is_a? String + b = DateTime.rfc3339(b).strftime('%Q').to_i + end + (a.is_a? Numeric) ? a > b : false + rescue => e + false + end + end + } + + class EvaluationError < StandardError + end + + # Evaluates a feature flag, returning a hash containing the evaluation result and any events + # generated during prerequisite evaluation. Raises EvaluationError if the flag is not well-formed + # Will return nil, but not raise an exception, indicating that the rules (including fallthrough) did not match + # In that case, the caller should return the default value. + def evaluate(flag, user, store) + if flag.nil? + raise EvaluationError, "Flag does not exist" + end + + if user.nil? || user[:key].nil? + raise EvaluationError, "Invalid user" + end + + events = [] + + if flag[:on] + res = eval_internal(flag, user, store, events) + + return {value: res, events: events} if !res.nil? + end + + if !flag[:offVariation].nil? && flag[:offVariation] < flag[:variations].length + value = flag[:variations][flag[:offVariation]] + return {value: value, events: events} + end + + {value: nil, events: events} + end + + def eval_internal(flag, user, store, events) + failed_prereq = false + # Evaluate prerequisites, if any + if !flag[:prerequisites].nil? + flag[:prerequisites].each do |prerequisite| + + prereq_flag = store.get(prerequisite[:key]) + + if prereq_flag.nil? || !prereq_flag[:on] + failed_prereq = true + else + begin + prereq_res = eval_internal(prereq_flag, user, store, events) + variation = get_variation(prereq_flag, prerequisite[:variation]) + events.push(kind: "feature", key: prereq_flag[:key], value: prereq_res) + if prereq_res.nil? || prereq_res!= variation + failed_prereq = true + end + rescue => exn + @config.logger.error("[LDClient] Error evaluating prerequisite: #{exn.inspect}") + failed_prereq = true + end + end + end + + if failed_prereq + return nil + end + end + # The prerequisites were satisfied. + # Now walk through the evaluation steps and get the correct + # variation index + eval_rules(flag, user) + end + + def eval_rules(flag, user) + # Check user target matches + if !flag[:targets].nil? + flag[:targets].each do |target| + if !target[:values].nil? + target[:values].each do |value| + return get_variation(flag, target[:variation]) if value == user[:key] + end + end + end + end + + # Check custom rules + if !flag[:rules].nil? + flag[:rules].each do |rule| + return variation_for_user(rule, user, flag) if rule_match_user(rule, user) + end + end + + # Check the fallthrough rule + if !flag[:fallthrough].nil? + return variation_for_user(flag[:fallthrough], user, flag) + end + + # Not even the fallthrough matched-- return the off variation or default + nil + end + + def get_variation(flag, index) + if index >= flag[:variations].length + raise EvaluationError, "Invalid variation index" + end + flag[:variations][index] + end + + def rule_match_user(rule, user) + return false if !rule[:clauses] + + rule[:clauses].each do |clause| + return false if !clause_match_user(clause, user) + end + + return true + end + + def clause_match_user(clause, user) + val = user_value(user, clause[:attribute]) + return false if val.nil? + + op = OPERATORS[clause[:op].to_sym] + + if op.nil? + raise EvaluationError, "Unsupported operator #{clause[:op]} in evaluation" + end + + if val.is_a? Enumerable + val.each do |v| + return maybe_negate(clause, true) if match_any(op, v, clause[:values]) + end + return maybe_negate(clause, false) + end + + maybe_negate(clause, match_any(op, val, clause[:values])) + end + + def variation_for_user(rule, user, flag) + if !rule[:variation].nil? # fixed variation + return get_variation(flag, rule[:variation]) + elsif !rule[:rollout].nil? # percentage rollout + rollout = rule[:rollout] + bucket_by = rollout[:bucketBy].nil? ? "key" : rollout[:bucketBy] + bucket = bucket_user(user, flag[:key], bucket_by, flag[:salt]) + sum = 0; + rollout[:variations].each do |variate| + sum += variate[:weight].to_f / 100000.0 + return get_variation(flag, variate[:variation]) if bucket < sum + end + nil + else # the rule isn't well-formed + raise EvaluationError, "Rule does not define a variation or rollout" + end + end + + def bucket_user(user, key, bucket_by, salt) + return nil unless user[:key] + + id_hash = user_value(user, bucket_by) + + if user[:secondary] + id_hash += "." + user[:secondary] + end + + hash_key = "%s.%s.%s" % [key, salt, id_hash] + + hash_val = (Digest::SHA1.hexdigest(hash_key))[0..14] + hash_val.to_i(16) / Float(0xFFFFFFFFFFFFFFF) + end + + def user_value(user, attribute) + attribute = attribute.to_sym + + if BUILTINS.include? attribute + user[attribute] + elsif !user[:custom].nil? + user[:custom][attribute] + else + nil + end + end + + def maybe_negate(clause, b) + clause[:negate] ? !b : b + end + + def match_any(op, value, values) + values.each do |v| + return true if op.call(value, v) + end + return false + end + end + +end + diff --git a/lib/ldclient-rb/events.rb b/lib/ldclient-rb/events.rb new file mode 100644 index 00000000..47b4ac23 --- /dev/null +++ b/lib/ldclient-rb/events.rb @@ -0,0 +1,75 @@ +require "thread" +require "faraday" + +module LaunchDarkly + + class EventProcessor + def initialize(api_key, config) + @queue = Queue.new + @api_key = api_key + @config = config + @client = Faraday.new + @worker = create_worker + end + + def create_worker + Thread.new do + loop do + begin + flush + sleep(@config.flush_interval) + rescue StandardError => exn + log_exception(__method__.to_s, exn) + end + end + end + end + + def post_flushed_events(events) + res = @client.post (@config.events_uri + "/bulk") do |req| + req.headers["Authorization"] = "api_key " + @api_key + req.headers["User-Agent"] = "RubyClient/" + LaunchDarkly::VERSION + req.headers["Content-Type"] = "application/json" + req.body = events.to_json + req.options.timeout = @config.read_timeout + req.options.open_timeout = @config.connect_timeout + end + if res.status / 100 != 2 + @config.logger.error("[LDClient] Unexpected status code while processing events: #{res.status}") + end + end + + def flush + events = [] + begin + loop do + events << @queue.pop(true) + end + rescue ThreadError + end + + if !events.empty? + post_flushed_events(events) + end + end + + def add_event(event) + return if @offline + + if @queue.length < @config.capacity + event[:creationDate] = (Time.now.to_f * 1000).to_i + @config.logger.debug("[LDClient] Enqueueing event: #{event.to_json}") + @queue.push(event) + + if !@worker.alive? + @worker = create_worker + end + else + @config.logger.warn("[LDClient] Exceeded event queue capacity. Increase capacity to avoid dropping events.") + end + end + + private :create_worker, :post_flushed_events + + end +end \ No newline at end of file diff --git a/lib/ldclient-rb/feature_store.rb b/lib/ldclient-rb/feature_store.rb new file mode 100644 index 00000000..332ed804 --- /dev/null +++ b/lib/ldclient-rb/feature_store.rb @@ -0,0 +1,60 @@ +require "concurrent/atomics" + +module LaunchDarkly + + class InMemoryFeatureStore + def initialize + @features = Hash.new + @lock = Concurrent::ReadWriteLock.new + @initialized = Concurrent::AtomicBoolean.new(false) + end + + def get(key) + @lock.with_read_lock do + f = @features[key.to_sym] + (f.nil? || f[:deleted]) ? nil : f + end + end + + def all + @lock.with_read_lock do + @features.select { |_k, f| not f[:deleted] } + end + end + + def delete(key, version) + @lock.with_write_lock do + old = @features[key.to_sym] + + if !old.nil? && old[:version] < version + old[:deleted] = true + old[:version] = version + @features[key.to_sym] = old + elsif old.nil? + @features[key.to_sym] = { deleted: true, version: version } + end + end + end + + def init(fs) + @lock.with_write_lock do + @features.replace(fs) + @initialized.make_true + end + end + + def upsert(key, feature) + @lock.with_write_lock do + old = @features[key.to_sym] + + if old.nil? || old[:version] < feature[:version] + @features[key.to_sym] = feature + end + end + end + + def initialized? + @initialized.value + end + end +end \ No newline at end of file diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index c34b16e6..14722c32 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -1,22 +1,17 @@ -require "faraday/http_cache" -require "json" require "digest/sha1" -require "thread" require "logger" -require "net/http/persistent" require "benchmark" -require "hashdiff" +require "waitutil" +require "json" module LaunchDarkly - BUILTINS = [:key, :ip, :country, :email, :firstName, :lastName, :avatar, :name, :anonymous] - # # A client for the LaunchDarkly API. Client instances are thread-safe. Users # should create a single client instance for the lifetime of the application. # # class LDClient - include Settings + include Evaluation # # Creates a new client instance that connects to LaunchDarkly. A custom # configuration parameter can also supplied to specify advanced options, @@ -27,70 +22,36 @@ class LDClient # @param config [Config] an optional client configuration object # # @return [LDClient] The LaunchDarkly client instance - def initialize(api_key, config = Config.default) - @queue = Queue.new + def initialize(api_key, config = Config.default, wait_for_sec = 5) @api_key = api_key @config = config - @client = Faraday.new do |builder| - builder.use :http_cache, store: @config.store - - builder.adapter :net_http_persistent - end - @offline = false - - if @config.stream? - @stream_processor = StreamProcessor.new(api_key, config) - end - - @worker = create_worker - end - - def flush - events = [] - begin - loop do - events << @queue.pop(true) + @store = config.feature_store + requestor = Requestor.new(api_key, config) + + if !@config.offline? + if @config.stream? + @update_processor = StreamProcessor.new(api_key, config, requestor) + else + @update_processor = PollingProcessor.new(config, requestor) end - rescue ThreadError + @update_processor.start end - if !events.empty? - post_flushed_events(events) - end - end + @event_processor = EventProcessor.new(api_key, config) - def post_flushed_events(events) - res = log_timings("Flush events") do - next @client.post (@config.events_uri + "/bulk") do |req| - req.headers["Authorization"] = "api_key " + @api_key - req.headers["User-Agent"] = "RubyClient/" + LaunchDarkly::VERSION - req.headers["Content-Type"] = "application/json" - req.body = events.to_json - req.options.timeout = @config.read_timeout - req.options.open_timeout = @config.connect_timeout - end - end - if res.status / 100 != 2 - @config.logger.error("[LDClient] Unexpected status code while processing events: #{res.status}") - end - end - - def create_worker - Thread.new do - loop do - begin - flush - - sleep(@config.flush_interval) - rescue StandardError => exn - log_exception(__method__.to_s, exn) + if !@config.offline? && wait_for_sec > 0 + begin + WaitUtil.wait_for_condition("LaunchDarkly client initialization", :timeout_sec => wait_for_sec, :delay_sec => 0.1) do + @update_processor.initialized? end + rescue WaitUtil::TimeoutError + @config.logger.error("[LDClient] Timeout encountered waiting for LaunchDarkly client initialization") end end end - def get_flag?(key, user, default = false) - toggle?(key, user, default) + def flush + @event_processor.flush end # @@ -128,47 +89,43 @@ def get_flag?(key, user, default = false) # @return [Boolean] whether or not the flag should be enabled, or the # default value if the flag is disabled on the LaunchDarkly control panel def toggle?(key, user, default = false) - return default if @offline + return default if @config.offline? unless user @config.logger.error("[LDClient] Must specify user") + @event_processor.add_event(kind: "feature", key: key, value: default, default: default) return default end - sanitize_user(user) - - if @config.stream? && !@stream_processor.started? - @stream_processor.start - end - if @config.stream? && @stream_processor.initialized? - feature = get_streamed_flag(key) - else - feature = get_flag_int(key) + if !@update_processor.initialized? + @config.logger.error("[LDClient] Client has not finished initializing. Returning default value") + @event_processor.add_event(kind: "feature", key: key, value: default, default: default) + return default end - value = evaluate(feature, user) - value = value.nil? ? default : value - add_event(kind: "feature", key: key, user: user, value: value, default: default) - LDNewRelic.annotate_transaction(key, value) - return value - rescue StandardError => error - log_exception(__method__.to_s, error) - default - end - - def add_event(event) - return if @offline - - if @queue.length < @config.capacity - event[:creationDate] = (Time.now.to_f * 1000).to_i - @queue.push(event) + sanitize_user(user) + feature = @store.get(key) - if !@worker.alive? - @worker = create_worker + begin + res = evaluate(feature, user, @store) + if !res[:events].nil? + res[:events].each do |event| + @event_processor.add_event(event) + end + end + if !res[:value].nil? + @event_processor.add_event(kind: "feature", key: key, user: user, value: res[:value], default: default) + return res[:value] + else + @config.logger.debug("[LDClient] Result value is null in toggle") + @event_processor.add_event(kind: "feature", key: key, user: user, value: default, default: default) + return default + end + rescue => exn + @config.logger.warn("[LDClient] Error evaluating feature flag: #{exn.inspect}. \nTrace: #{exn.backtrace}") + @event_processor.add_event(kind: "feature", key: key, user: user, value: default, default: default) + return default end - else - @config.logger.warn("[LDClient] Exceeded event queue capacity. Increase capacity to avoid dropping events.") - end end # @@ -178,19 +135,7 @@ def add_event(event) # def identify(user) sanitize_user(user) - add_event(kind: "identify", key: user[:key], user: user) - end - - def set_offline - @offline = true - end - - def set_online - @offline = false - end - - def is_offline? - @offline + @event_processor.add_event(kind: "identify", key: user[:key], user: user) end # @@ -203,204 +148,19 @@ def is_offline? # @return [void] def track(event_name, user, data) sanitize_user(user) - add_event(kind: "custom", key: event_name, user: user, data: data) - end - - # - # Returns the key of every feature flag - # - def all_keys - all_flags.keys + @event_processor.add_event(kind: "custom", key: event_name, user: user, data: data) end # - # Returns all feature flags + # Returns all feature flag values for the given user # - def all_flags - return Hash.new if @offline + def all_flags(user) + return Hash.new if @config.offline? - if @config.stream? && !@stream_processor.started? - @stream_processor.start - end - - if @config.stream? && @stream_processor.initialized? - @stream_processor.get_all_features - else - res = make_request "/api/eval/features" - - if res.status / 100 == 2 - JSON.parse(res.body, symbolize_names: true) - else - @config.logger.error("[LDClient] Unexpected status code #{res.status}") - Hash.new - end - end - end - - def get_user_settings(user) - Hash[all_flags.map { |key, feature| [key, evaluate(feature, user)]}] - end - - def get_streamed_flag(key) - feature = get_flag_stream(key) - if @config.debug_stream? - polled = get_flag_int(key) - diff = HashDiff.diff(feature, polled) - if not diff.empty? - @config.logger.error("Streamed flag differs from polled flag " + diff.to_s) - end - end - feature - end - - def get_flag_stream(key) - @stream_processor.get_feature(key) - end + features = @store.all - def get_flag_int(key) - res = log_timings("Feature request") do - next make_request "/api/eval/features/" + key - end - - if res.status == 401 - @config.logger.error("[LDClient] Invalid API key") - return nil - end - - if res.status == 404 - @config.logger.error("[LDClient] Unknown feature key: #{key}") - return nil - end - - if res.status / 100 != 2 - @config.logger.error("[LDClient] Unexpected status code #{res.status}") - return nil - end - - JSON.parse(res.body, symbolize_names: true) - end - - def make_request(path) - @client.get (@config.base_uri + path) do |req| - req.headers["Authorization"] = "api_key " + @api_key - req.headers["User-Agent"] = "RubyClient/" + LaunchDarkly::VERSION - req.options.timeout = @config.read_timeout - req.options.open_timeout = @config.connect_timeout - end - end - - def param_for_user(feature, user) - return nil unless user[:key] - - id_hash = user[:key] - if user[:secondary] - id_hash += "." + user[:secondary] - end - - hash_key = "%s.%s.%s" % [feature[:key], feature[:salt], id_hash] - - hash_val = (Digest::SHA1.hexdigest(hash_key))[0..14] - hash_val.to_i(16) / Float(0xFFFFFFFFFFFFFFF) - end - - def match_target?(target, user) - attrib = target[:attribute].to_sym - - if BUILTINS.include?(attrib) - return false unless user[attrib] - - u_value = user[attrib] - return target[:values].include? u_value - else # custom attribute - return false unless user[:custom] - return false unless user[:custom].include? attrib - - u_value = user[:custom][attrib] - if u_value.is_a? Array - return ! ((target[:values] & u_value).empty?) - else - return target[:values].include? u_value - end - - return false - end - end - - def match_user?(variation, user) - if variation[:userTarget] - return match_target?(variation[:userTarget], user) - end - false - end - - def find_user_match(feature, user) - feature[:variations].each do |variation| - return variation[:value] if match_user?(variation, user) - end - nil - end - - def match_variation?(variation, user) - variation[:targets].each do |target| - if !!variation[:userTarget] && target[:attribute].to_sym == :key - next - end - - if match_target?(target, user) - return true - end - end - false - end - - def find_target_match(feature, user) - feature[:variations].each do |variation| - return variation[:value] if match_variation?(variation, user) - end - nil - end - - def find_weight_match(feature, param) - total = 0.0 - feature[:variations].each do |variation| - total += variation[:weight].to_f / 100.0 - - return variation[:value] if param < total - end - - nil - end - - def evaluate(feature, user) - return nil if feature.nil? - return nil unless feature[:on] - - param = param_for_user(feature, user) - return nil if param.nil? - - value = find_user_match(feature, user) - return value if !value.nil? - - value = find_target_match(feature, user) - return value if !value.nil? - - find_weight_match(feature, param) - end - - def log_timings(label, &block) - return block.call unless @config.log_timings? && @config.logger.debug? - res = nil - exn = nil - bench = Benchmark.measure do - begin - res = block.call - rescue StandardError => e - exn = e - end - end - @config.logger.debug { "[LDClient] #{label} timing: #{bench}".chomp } - raise exn if exn - res + # TODO rescue if necessary + Hash[features{|k,f| [k, evaluate(f, user, @store)[:value]] }] end def log_exception(caller, exn) @@ -415,9 +175,6 @@ def sanitize_user(user) end end - private :post_flushed_events, :add_event, :get_streamed_flag, - :get_flag_stream, :get_flag_int, :make_request, :param_for_user, - :match_target?, :match_user?, :match_variation?, :evaluate, - :create_worker, :log_timings, :log_exception, :sanitize_user + private :evaluate, :log_exception, :sanitize_user end end diff --git a/lib/ldclient-rb/polling.rb b/lib/ldclient-rb/polling.rb new file mode 100644 index 00000000..f3e3bf69 --- /dev/null +++ b/lib/ldclient-rb/polling.rb @@ -0,0 +1,56 @@ +require "concurrent/atomics" +require "thread" + +module LaunchDarkly + class PollingProcessor + + def initialize(config, requestor) + @config = config + @requestor = requestor + @initialized = Concurrent::AtomicBoolean.new(false) + @started = Concurrent::AtomicBoolean.new(false) + end + + def initialized? + @initialized.value + end + + def start + return unless @started.make_true + @config.logger.info("[LDClient] Initializing polling connection") + create_worker + end + + def poll + flags = @requestor.request_all_flags + if flags + @config.feature_store.init(flags) + if @initialized.make_true + @config.logger.info("[LDClient] Polling connection initialized") + end + end + end + + def create_worker + Thread.new do + @config.logger.debug("[LDClient] Starting polling worker") + loop do + begin + started_at = Time.now + poll + delta = @config.poll_interval - (Time.now - started_at) + if delta > 0 + sleep(delta) + end + rescue StandardError => exn + @config.logger.error("[LDClient] Exception while polling: #{exn.inspect}") + # TODO: log_exception(__method__.to_s, exn) + end + end + end + end + + + private :poll, :create_worker + end +end \ No newline at end of file diff --git a/lib/ldclient-rb/requestor.rb b/lib/ldclient-rb/requestor.rb new file mode 100644 index 00000000..72c8d493 --- /dev/null +++ b/lib/ldclient-rb/requestor.rb @@ -0,0 +1,56 @@ +require "json" +require "net/http/persistent" +require "faraday/http_cache" + +module LaunchDarkly + + class Requestor + def initialize(api_key, config) + @api_key = api_key + @config = config + @client = Faraday.new do |builder| + builder.use :http_cache, store: @config.cache_store + + builder.adapter :net_http_persistent + end + end + + def request_all_flags() + make_request("/sdk/latest-flags") + end + + def request_flag(key) + make_request("/sdk/latest-flags/" + key) + end + + def make_request(path) + res = @client.get (@config.base_uri + path) do |req| + req.headers["Authorization"] = "api_key " + @api_key + req.headers["User-Agent"] = "RubyClient/" + LaunchDarkly::VERSION + req.options.timeout = @config.read_timeout + req.options.open_timeout = @config.connect_timeout + end + + if res.status == 401 + @config.logger.error("[LDClient] Invalid API key") + return nil + end + + if res.status == 404 + @config.logger.error("[LDClient] Resource not found") + return nil + end + + if res.status / 100 != 2 + @config.logger.error("[LDClient] Unexpected status code #{res.status}") + return nil + end + + JSON.parse(res.body, symbolize_names: true) + end + + private :make_request + + end + +end \ No newline at end of file diff --git a/lib/ldclient-rb/settings.rb b/lib/ldclient-rb/settings.rb deleted file mode 100644 index fad2cfd9..00000000 --- a/lib/ldclient-rb/settings.rb +++ /dev/null @@ -1,40 +0,0 @@ -module LaunchDarkly - - # - # Module to manage user flag settings - # - module Settings - # - # Specifically enable or disable a feature flag for a user based - # on their key. - # - # @param user_key [String] the key of the user - # @param flag_key [String] the unique feature key for the feature flag, as shown - # on the LaunchDarkly dashboard - # @param setting [Boolean] the new setting, one of: - # true: the feature is always on - # false: the feature is never on - # nil: remove the setting (assign user per defined rules) - def update_user_flag_setting(user_key, flag_key, setting=nil) - unless user_key - @config.logger.error("[LDClient] Must specify user") - return - end - - res = log_timings('update_user_flag_setting') do - @client.put("#{@config.base_uri}/api/users/#{user_key}/features/#{flag_key}") do |req| - req.headers['Authorization'] = "api_key #{@api_key}" - req.headers['User-Agent'] = "RubyClient/#{LaunchDarkly::VERSION}" - req.headers['Content-Type'] = 'application/json' - req.body = {setting: setting}.to_json - req.options.timeout = @config.read_timeout - req.options.open_timeout = @config.connect_timeout - end - end - - unless res.success? - @config.logger.error("[LDClient] Failed to change setting, status: #{res.status}") - end - end - end -end \ No newline at end of file diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index 7db8e555..b0b52a2e 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -3,99 +3,30 @@ require "celluloid/eventsource" module LaunchDarkly - PUT = "put" - PATCH = "patch" - DELETE = "delete" - - class InMemoryFeatureStore - def initialize - @features = Hash.new - @lock = Concurrent::ReadWriteLock.new - @initialized = Concurrent::AtomicBoolean.new(false) - end - - def get(key) - @lock.with_read_lock do - f = @features[key.to_sym] - (f.nil? || f[:deleted]) ? nil : f - end - end - - def all - @lock.with_read_lock do - @features.select { |_k, f| not f[:deleted] } - end - end - - def delete(key, version) - @lock.with_write_lock do - old = @features[key.to_sym] - - if !old.nil? && old[:version] < version - old[:deleted] = true - old[:version] = version - @features[key.to_sym] = old - elsif old.nil? - @features[key.to_sym] = { deleted: true, version: version } - end - end - end - - def init(fs) - @lock.with_write_lock do - @features.replace(fs) - @initialized.make_true - end - end - - def upsert(key, feature) - @lock.with_write_lock do - old = @features[key.to_sym] - - if old.nil? || old[:version] < feature[:version] - @features[key.to_sym] = feature - end - end - end - - def initialized? - @initialized.value - end - end + PUT = :put + PATCH = :patch + DELETE = :delete + INDIRECT_PUT = :'indirect/put' + INDIRECT_PATCH = :'indirect/patch' class StreamProcessor - def initialize(api_key, config) + def initialize(api_key, config, requestor) @api_key = api_key @config = config - @store = config.feature_store ? config.feature_store : InMemoryFeatureStore.new - @disconnected = Concurrent::AtomicReference.new(nil) + @store = config.feature_store + @requestor = requestor + @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) end def initialized? - @store.initialized? - end - - def started? - @started.value - end - - def get_all_features - if not initialized? - throw :uninitialized - end - @store.all - end - - def get_feature(key) - if not initialized? - throw :uninitialized - end - @store.get(key) + @initialized.value end def start return unless @started.make_true + + @config.logger.info("[LDClient] Initializing stream connection") headers = { @@ -103,51 +34,40 @@ def start 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION } opts = {:headers => headers, :with_credentials => true} - @es = Celluloid::EventSource.new(@config.stream_uri + "/features", opts) do |conn| - conn.on_open do - set_connected - end - + @es = Celluloid::EventSource.new(@config.stream_uri + "/flags", opts) do |conn| conn.on(PUT) { |message| process_message(message, PUT) } conn.on(PATCH) { |message| process_message(message, PATCH) } conn.on(DELETE) { |message| process_message(message, DELETE) } - + conn.on(INDIRECT_PUT) { |message| process_message(message, INDIRECT_PUT) } + conn.on(INDIRECT_PATCH) { |message| process_message(message, INDIRECT_PATCH) } conn.on_error do |message| - # TODO replace this with proper logging @config.logger.error("[LDClient] Error connecting to stream. Status code: #{message[:status_code]}") - set_disconnected end end end def process_message(message, method) message = JSON.parse(message.data, symbolize_names: true) + @config.logger.debug("[LDClient] Stream received #{method} message") if method == PUT @store.init(message) + @initialized.make_true + @config.logger.info("[LDClient] Stream initialized") elsif method == PATCH @store.upsert(message[:path][1..-1], message[:data]) elsif method == DELETE @store.delete(message[:path][1..-1], message[:version]) + elsif method == INDIRECT_PUT + @store.init(@requestor.request_all_flags) + @initialized.make_true + @config.logger.info("[LDClient] Stream initialized (via indirect message)") + elsif method == INDIRECT_PATCH + @store.upsert(@requestor.request_flag(message[:data])) else @config.logger.error("[LDClient] Unknown message received: #{method}") end - set_connected - end - - def set_disconnected - @disconnected.set(Time.now) - end - - def set_connected - @disconnected.set(nil) - end - - def should_fallback_update - disc = @disconnected.get - !disc.nil? && disc < (Time.now - 120) end - # TODO mark private methods - private :process_message, :set_connected, :set_disconnected + private :process_message end end diff --git a/spec/config_spec.rb b/spec/config_spec.rb index fda9f23c..da121ced 100644 --- a/spec/config_spec.rb +++ b/spec/config_spec.rb @@ -32,14 +32,14 @@ expect(subject.new.stream_uri).to eq subject.default_stream_uri end end - describe ".default_store" do + describe ".default_cache_store" do it "uses Rails cache if it is available" do rails = instance_double("Rails", cache: :cache) stub_const("Rails", rails) - expect(subject.default_store).to eq :cache + expect(subject.default_cache_store).to eq :cache end it "uses memory store if Rails is not available" do - expect(subject.default_store).to be_an_instance_of LaunchDarkly::ThreadSafeMemoryStore + expect(subject.default_cache_store).to be_an_instance_of LaunchDarkly::ThreadSafeMemoryStore end end describe ".default_logger" do diff --git a/spec/fixtures/feature.json b/spec/fixtures/feature.json index 3b347ca7..152565f7 100644 --- a/spec/fixtures/feature.json +++ b/spec/fixtures/feature.json @@ -1,67 +1,36 @@ { - "name":"New recommendations engine", - "key":"engine.enable", - "kind":"flag", - "salt":"ZW5naW5lLmVuYWJsZQ==", + "key":"test-feature-flag", + "version":11, "on":true, - "variations":[ + "prerequisites":[ + + ], + "salt":"718ea30a918a4eba8734b57ab1a93227", + "sel":"fe1244e5378c4f99976c9634e33667c6", + "targets":[ { - "value":true, - "weight":31, - "targets":[ - { - "attribute":"groups", - "op":"in", - "values":[ - "Microsoft" - ] - } + "values":[ + "alice" ], - "userTarget":{ - "attribute":"key", - "op":"in", - "values":[ - "foo@bar.com", - "Abbey.Orkwis@example.com", - "Abbie.Stolte@example.com", - "44d2781c-5985-4d89-b07a-0dffbd24126f", - "0ffe4f0c-7aa9-4621-a87c-abe1c051abd8", - "f52d99be-6a40-4cdd-a7b4-0548834fcbe5", - "Vernetta.Belden@example.com", - "c9d591bd-ea1f-465f-86d2-239ea41d0f0f", - "870745092" - ] - } + "variation":0 }, { - "value":false, - "weight":69, - "targets":[ - { - "attribute":"key", - "op":"in", - "values":[ - "Alida.Caples@example.com" - ] - }, - { - "attribute":"groups", - "op":"in", - "values":[ - - ] - } + "values":[ + "bob" ], - "userTarget":{ - "attribute":"key", - "op":"in", - "values":[ - "Alida.Caples@example.com" - ] - } + "variation":1 } ], - "ttl":0, - "commitDate":"2015-05-10T06:06:45.381Z", - "creationDate":"2014-09-02T20:39:18.61Z" -} + "rules":[ + + ], + "fallthrough":{ + "variation":0 + }, + "offVariation":1, + "variations":[ + true, + false + ], + "deleted":false +} \ No newline at end of file diff --git a/spec/ldclient_spec.rb b/spec/ldclient_spec.rb index 2ed0d545..3a0a0174 100644 --- a/spec/ldclient_spec.rb +++ b/spec/ldclient_spec.rb @@ -1,10 +1,11 @@ require "spec_helper" + describe LaunchDarkly::LDClient do subject { LaunchDarkly::LDClient } + let(:config) { LaunchDarkly::Config.new({:offline => true}) } let(:client) do - expect_any_instance_of(LaunchDarkly::LDClient).to receive :create_worker - subject.new("api_key") + subject.new("api_key", config) end let(:feature) do data = File.read(File.join("spec", "fixtures", "feature.json")) @@ -23,246 +24,32 @@ JSON.parse(data, symbolize_names: true) end - context 'user flag settings' do - describe '#update_user_flag_setting' do - it 'requires user' do - expect(client.instance_variable_get(:@config).logger).to receive(:error) - client.update_user_flag_setting(nil, feature[:key], true) - end - - it 'puts the new setting' do - result = double('result', success?: true, status: 204) - expect(client.instance_variable_get(:@client)).to receive(:put).and_return(result) - client.update_user_flag_setting(user[:key], feature[:key], true) - end - end - end - - describe '#flush' do - it "will flush and post all events" do - client.instance_variable_get(:@queue).push "asdf" - client.instance_variable_get(:@queue).push "asdf" - expect(client).to receive(:post_flushed_events) - client.flush - expect(client.instance_variable_get(:@queue).length).to eq 0 - end - it "will not do anything if there are no events" do - expect(client).to_not receive(:post_flushed_events) - expect(client.instance_variable_get(:@config).logger).to_not receive :error - client.flush - end - end - - describe '#post_flushed_events' do - let(:events) { ["event"] } - it "will flush and post all events" do - result = double("result", status: 200) - expect(client.instance_variable_get(:@client)).to receive(:post).with(LaunchDarkly::Config.default_events_uri + "/bulk").and_return result - expect(client.instance_variable_get(:@config).logger).to_not receive :error - client.send(:post_flushed_events, events) - expect(client.instance_variable_get(:@queue).length).to eq 0 - end - it "will allow any 2XX response" do - result = double("result", status: 202) - expect(client.instance_variable_get(:@client)).to receive(:post).and_return result - expect(client.instance_variable_get(:@config).logger).to_not receive :error - client.send(:post_flushed_events, events) - end - it "will work with unexpected post results" do - result = double("result", status: 418) - expect(client.instance_variable_get(:@client)).to receive(:post).and_return result - expect(client.instance_variable_get(:@config).logger).to receive :error - client.send(:post_flushed_events, events) - expect(client.instance_variable_get(:@queue).length).to eq 0 - end - end - describe '#toggle?' do - it "will not fail" do - expect(client.instance_variable_get(:@config)).to receive(:stream?).and_raise RuntimeError - expect(client.instance_variable_get(:@config).logger).to receive(:error) + it "will return the default value if the client is offline" do result = client.toggle?(feature[:key], user, "default") expect(result).to eq "default" end - it "will specify the default value in the feature request event" do - expect(client).to receive(:add_event).with(hash_including(default: "default")) - result = client.toggle?(feature[:key], user, "default") - end - it "requires user" do - expect(client.instance_variable_get(:@config).logger).to receive(:error) - result = client.toggle?(feature[:key], nil, "default") - expect(result).to eq "default" - end - it "sanitizes the user in the event" do - expect(client).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) - client.toggle?(feature[:key], numeric_key_user, "default") - end - it "returns value from streamed flag if available" do - expect(client.instance_variable_get(:@config)).to receive(:stream?).and_return(true).twice - expect(client.instance_variable_get(:@stream_processor)).to receive(:started?).and_return true - expect(client.instance_variable_get(:@stream_processor)).to receive(:initialized?).and_return true - expect(client).to receive(:add_event) - expect(client).to receive(:get_streamed_flag).and_return feature - result = client.toggle?(feature[:key], user, "default") - expect(result).to eq false - end - it "returns value from normal request if streamed flag is not available" do - expect(client.instance_variable_get(:@config)).to receive(:stream?).and_return(false).twice - expect(client).to receive(:add_event) - expect(client).to receive(:get_flag_int).and_return feature - result = client.toggle?(feature[:key], user, "default") - expect(result).to eq false - end - end - - describe '#identify' do - it "queues up an identify event" do - expect(client).to receive(:add_event).with(hash_including(kind: "identify", key: user[:key], user: user)) - client.identify(user) - end - it "sanitizes the user in the event" do - expect(client).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) - client.identify(numeric_key_user) - end end describe '#track' do it "queues up an custom event" do - expect(client).to receive(:add_event).with(hash_including(kind: "custom", key: "custom_event_name", user: user, data: 42)) + expect(client.instance_variable_get(:@event_processor)).to receive(:add_event).with(hash_including(kind: "custom", key: "custom_event_name", user: user, data: 42)) client.track("custom_event_name", user, 42) end it "sanitizes the user in the event" do - expect(client).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) + expect(client.instance_variable_get(:@event_processor)).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) client.track("custom_event_name", numeric_key_user, nil) end end - describe '#get_streamed_flag' do - it "will not check the polled flag normally" do - expect(client).to receive(:get_flag_stream).and_return true - expect(client).to_not receive(:get_flag_int) - expect(client.send(:get_streamed_flag, "key")).to eq true - end - context "debug stream" do - it "will log an error if the streamed and polled flag do not match" do - expect(client.instance_variable_get(:@config)).to receive(:debug_stream?).and_return true - expect(client).to receive(:get_flag_stream).and_return true - expect(client).to receive(:get_flag_int).and_return false - expect(client.instance_variable_get(:@config).logger).to receive(:error) - expect(client.send(:get_streamed_flag, "key")).to eq true - end - end - end - - describe '#all_flags' do - it "will parse and return the features list" do - result = double("Faraday::Response", status: 200, body: '{"asdf":"qwer"}') - expect(client).to receive(:make_request).with("/api/eval/features").and_return(result) - data = client.send(:all_flags) - expect(data).to eq(asdf: "qwer") - end - it "will log errors" do - result = double("Faraday::Response", status: 418) - expect(client).to receive(:make_request).with("/api/eval/features").and_return(result) - expect(client.instance_variable_get(:@config).logger).to receive(:error) - client.send(:all_flags) - end - end - - describe '#get_flag_int' do - it "will return the parsed flag" do - result = double("Faraday::Response", status: 200, body: '{"asdf":"qwer"}') - expect(client).to receive(:make_request).with("/api/eval/features/key").and_return(result) - data = client.send(:get_flag_int, "key") - expect(data).to eq(asdf: "qwer") - end - it "will accept 401 statuses" do - result = double("Faraday::Response", status: 401) - expect(client).to receive(:make_request).with("/api/eval/features/key").and_return(result) - expect(client.instance_variable_get(:@config).logger).to receive(:error) - data = client.send(:get_flag_int, "key") - expect(data).to be_nil - end - it "will accept 404 statuses" do - result = double("Faraday::Response", status: 404) - expect(client).to receive(:make_request).with("/api/eval/features/key").and_return(result) - expect(client.instance_variable_get(:@config).logger).to receive(:error) - data = client.send(:get_flag_int, "key") - expect(data).to be_nil - end - it "will accept non-standard statuses" do - result = double("Faraday::Response", status: 418) - expect(client).to receive(:make_request).with("/api/eval/features/key").and_return(result) - expect(client.instance_variable_get(:@config).logger).to receive(:error) - data = client.send(:get_flag_int, "key") - expect(data).to be_nil - end - end - - describe '#make_request' do - it "will make a proper request" do - expect(client.instance_variable_get :@client).to receive(:get) - client.send(:make_request, "/asdf") - end - end - - describe '#param_for_user' do - it "will return a consistent hash of a user key, feature key, and feature salt" do - param = client.send(:param_for_user, feature, user) - expect(param).to be_between(0.0, 1.0).inclusive - end - end - - describe '#evaluate' do - it "will return nil if there is no feature" do - expect(client.send(:evaluate, nil, user)).to eq nil - end - it "will return nil unless the feature is on" do - feature[:on] = false - expect(client.send(:evaluate, feature, user)).to eq nil - end - it "will return value if it matches the user" do - user = { key: "Alida.Caples@example.com" } - expect(client.send(:evaluate, feature, user)).to eq false - user = { key: "foo@bar.com" } - expect(client.send(:evaluate, feature, user)).to eq true - end - it "will return value if the target matches" do - user = { key: "asdf@asdf.com", custom: { groups: "Microsoft" } } - expect(client.send(:evaluate, feature, user)).to eq true - end - it "will return value if the weight matches" do - expect(client).to receive(:param_for_user).and_return 0.1 - expect(client.send(:evaluate, feature, user)).to eq true - expect(client).to receive(:param_for_user).and_return 0.9 - expect(client.send(:evaluate, feature, user)).to eq false - end - end - - describe '#log_timings' do - let(:block) { lambda { "result" } } - let(:label) { "label" } - it "will not measure if not configured to do so" do - expect(Benchmark).to_not receive(:measure) - client.send(:log_timings, label, &block) + describe '#identify' do + it "queues up an identify event" do + expect(client.instance_variable_get(:@event_processor)).to receive(:add_event).with(hash_including(kind: "identify", key: user[:key], user: user)) + client.identify(user) end - context "logging enabled" do - before do - expect(client.instance_variable_get(:@config)).to receive(:log_timings?).and_return true - expect(client.instance_variable_get(:@config).logger).to receive(:debug?).and_return true - end - it "will benchmark timings and return result" do - expect(Benchmark).to receive(:measure).and_call_original - expect(client.instance_variable_get(:@config).logger).to receive(:debug) - result = client.send(:log_timings, label, &block) - expect(result).to eq "result" - end - it "will raise exceptions if the block has them" do - block = lambda { raise RuntimeError } - expect(Benchmark).to receive(:measure).and_call_original - expect(client.instance_variable_get(:@config).logger).to receive(:debug) - expect { client.send(:log_timings, label, &block) }.to raise_error RuntimeError - end + it "sanitizes the user in the event" do + expect(client.instance_variable_get(:@event_processor)).to receive(:add_event).with(hash_including(user: sanitized_numeric_key_user)) + client.identify(numeric_key_user) end end @@ -276,4 +63,4 @@ end end end -end +end \ No newline at end of file diff --git a/spec/stream_spec.rb b/spec/stream_spec.rb index c3c8e94c..9a43355c 100644 --- a/spec/stream_spec.rb +++ b/spec/stream_spec.rb @@ -33,7 +33,8 @@ describe LaunchDarkly::StreamProcessor do subject { LaunchDarkly::StreamProcessor } let(:config) { LaunchDarkly::Config.new } - let(:processor) { subject.new("api_key", config) } + let(:requestor) { LaunchDarkly::Requestor.new("api_key", config)} + let(:processor) { subject.new("api_key", config, requestor) } describe '#process_message' do let(:put_message) { OpenStruct.new({data: '{"key": {"value": "asdf"}}'}) } @@ -57,19 +58,5 @@ processor.send(:process_message, put_message, "get") end end - - describe '#should_fallback_update' do - it "will return true if the stream is disconnected for more than 120 seconds" do - processor.send(:set_disconnected) - future_time = Time.now + 200 - expect(Time).to receive(:now).and_return(future_time) - value = processor.send(:should_fallback_update) - expect(value).to eq true - end - it "will return false otherwise" do - processor.send(:set_connected) - value = processor.send(:should_fallback_update) - expect(value).to eq false - end - end end +