diff --git a/ldclient/client.py b/ldclient/client.py index c4406b3e..a02a49f5 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -141,17 +141,18 @@ def _make_update_processor(self, config, store, ready, diagnostic_accumulator): if config.offline or config.use_ldd: return NullUpdateProcessor(config, store, ready) + if config.stream: + return StreamingUpdateProcessor(config, store, ready, diagnostic_accumulator) + + log.info("Disabling streaming API") + log.warning("You should only disable the streaming API if instructed to do so by LaunchDarkly support") + if config.feature_requester_class: feature_requester = config.feature_requester_class(config) else: feature_requester = FeatureRequesterImpl(config) """ :type: FeatureRequester """ - if config.stream: - return StreamingUpdateProcessor(config, feature_requester, store, ready, diagnostic_accumulator) - - log.info("Disabling streaming API") - log.warning("You should only disable the streaming API if instructed to do so by LaunchDarkly support") return PollingUpdateProcessor(config, feature_requester, store, ready) def get_sdk_key(self): diff --git a/ldclient/event_processor.py b/ldclient/event_processor.py index bbc18076..b94d800a 100644 --- a/ldclient/event_processor.py +++ b/ldclient/event_processor.py @@ -28,9 +28,8 @@ from ldclient.interfaces import EventProcessor from ldclient.repeating_timer import RepeatingTimer from ldclient.util import UnsuccessfulResponseException -from ldclient.util import _headers, _retryable_statuses from ldclient.util import log -from ldclient.util import http_error_message, is_http_error_recoverable, stringify_attrs, throw_if_unsuccessful_response +from ldclient.util import check_if_error_is_recoverable_and_log, is_http_error_recoverable, stringify_attrs, throw_if_unsuccessful_response, _headers from ldclient.diagnostics import create_diagnostic_init __MAX_FLUSH_THREADS__ = 5 @@ -141,18 +140,6 @@ def _get_userkey(self, event): return str(event['user'].get('key')) -class _EventRetry(urllib3.Retry): - def __init__(self): - urllib3.Retry.__init__(self, total=1, - method_whitelist=False, # Enable retry on POST - status_forcelist=_retryable_statuses, - raise_on_status=False) - - # Override backoff time to be flat 1 second - def get_backoff_time(self): - return 1 - - class EventPayloadSendTask(object): def __init__(self, http, config, formatter, payload, response_fn): self._http = http @@ -175,16 +162,17 @@ def _do_send(self, output_events): try: json_body = json.dumps(output_events) log.debug('Sending events payload: ' + json_body) - hdrs = _headers(self._config) - hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__) - hdrs['X-LaunchDarkly-Payload-ID'] = str(uuid.uuid4()) - uri = self._config.events_uri - r = self._http.request('POST', uri, - headers=hdrs, - timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout), - body=json_body, - retries=_EventRetry()) - self._response_fn(r) + payload_id = str(uuid.uuid4()) + r = _post_events_with_retry( + self._http, + self._config, + self._config.events_uri, + payload_id, + json_body, + "%d events" % len(self._payload.events) + ) + if r: + self._response_fn(r) return r except Exception as e: log.warning( @@ -202,13 +190,14 @@ def run(self): try: json_body = json.dumps(self._event_body) log.debug('Sending diagnostic event: ' + json_body) - hdrs = _headers(self._config) - uri = self._config.events_base_uri + '/diagnostic' - r = self._http.request('POST', uri, - headers=hdrs, - timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout), - body=json_body, - retries=1) + _post_events_with_retry( + self._http, + self._config, + self._config.events_base_uri + '/diagnostic', + None, + json_body, + "diagnostic event" + ) except Exception as e: log.warning( 'Unhandled exception in event processor. Diagnostic event was not sent. [%s]', e) @@ -381,11 +370,9 @@ def _handle_response(self, r): if server_date is not None: timestamp = int(time.mktime(server_date) * 1000) self._last_known_past_time = timestamp - if r.status > 299: - log.error(http_error_message(r.status, "event delivery", "some events were dropped")) - if not is_http_error_recoverable(r.status): - self._disabled = True - return + if r.status > 299 and not is_http_error_recoverable(r.status): + self._disabled = True + return def _send_and_reset_diagnostics(self): if self._diagnostic_accumulator is not None: @@ -472,3 +459,43 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.stop() + + +def _post_events_with_retry( + http_client, + config, + uri, + payload_id, + body, + events_description +): + hdrs = _headers(config) + hdrs['Content-Type'] = 'application/json' + if payload_id: + hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__) + hdrs['X-LaunchDarkly-Payload-ID'] = payload_id + can_retry = True + context = "posting %s" % events_description + while True: + next_action_message = "will retry" if can_retry else "some events were dropped" + try: + r = http_client.request( + 'POST', + uri, + headers=hdrs, + body=body, + timeout=urllib3.Timeout(connect=config.connect_timeout, read=config.read_timeout), + retries=0 + ) + if r.status < 300: + return r + recoverable = check_if_error_is_recoverable_and_log(context, r.status, None, next_action_message) + if not recoverable: + return r + except Exception as e: + check_if_error_is_recoverable_and_log(context, None, str(e), next_action_message) + if not can_retry: + return None + can_retry = False + # fixed delay of 1 second for event retries + time.sleep(1) diff --git a/ldclient/feature_requester.py b/ldclient/feature_requester.py index 3ab812fe..4557104f 100644 --- a/ldclient/feature_requester.py +++ b/ldclient/feature_requester.py @@ -27,29 +27,20 @@ def __init__(self, config): self._cache = dict() self._http = _http_factory(config).create_pool_manager(1, config.base_uri) self._config = config + self._poll_uri = config.base_uri + LATEST_ALL_URI def get_all_data(self): - all_data = self._do_request(self._config.base_uri + LATEST_ALL_URI, True) - return { - FEATURES: all_data['flags'], - SEGMENTS: all_data['segments'] - } - - def get_one(self, kind, key): - return self._do_request(self._config.base_uri + kind.request_api_path + '/' + key, False) - - def _do_request(self, uri, allow_cache): + uri = self._poll_uri hdrs = _headers(self._config) - if allow_cache: - cache_entry = self._cache.get(uri) - if cache_entry is not None: - hdrs['If-None-Match'] = cache_entry.etag + cache_entry = self._cache.get(uri) + if cache_entry is not None: + hdrs['If-None-Match'] = cache_entry.etag r = self._http.request('GET', uri, headers=hdrs, timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout), retries=1) throw_if_unsuccessful_response(r) - if r.status == 304 and allow_cache and cache_entry is not None: + if r.status == 304 and cache_entry is not None: data = cache_entry.data etag = cache_entry.etag from_cache = True @@ -57,8 +48,12 @@ def _do_request(self, uri, allow_cache): data = json.loads(r.data.decode('UTF-8')) etag = r.getheader('ETag') from_cache = False - if allow_cache and etag is not None: + if etag is not None: self._cache[uri] = CacheEntry(data=data, etag=etag) log.debug("%s response status:[%d] From cache? [%s] ETag:[%s]", uri, r.status, from_cache, etag) - return data + + return { + FEATURES: data['flags'], + SEGMENTS: data['segments'] + } diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 1a319494..6b49782c 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -263,13 +263,6 @@ def get_all(self): """ pass - def get_one(self, kind, key): - """ - Gets one Feature flag - :return: - """ - pass - class DiagnosticDescription(object): """ diff --git a/ldclient/streaming.py b/ldclient/streaming.py index abc54247..061bca65 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -33,12 +33,11 @@ class StreamingUpdateProcessor(Thread, UpdateProcessor): - def __init__(self, config, requester, store, ready, diagnostic_accumulator): + def __init__(self, config, store, ready, diagnostic_accumulator): Thread.__init__(self) self.daemon = True self._uri = config.stream_base_uri + STREAM_ALL_PATH self._config = config - self._requester = requester self._store = store self._running = False self._ready = ready @@ -77,7 +76,7 @@ def run(self): if not self._running: break self._retry_delay.set_good_since(time.time()) - message_ok = self.process_message(self._store, self._requester, msg) + message_ok = self.process_message(self._store, msg) if message_ok: self._record_stream_init(False) self._es_started = None @@ -122,7 +121,7 @@ def initialized(self): # Returns True if we initialized the feature store @staticmethod - def process_message(store, requester, msg): + def process_message(store, msg): if msg.event == 'put': all_data = json.loads(msg.data) init_data = { @@ -143,18 +142,6 @@ def process_message(store, requester, msg): store.upsert(target.kind, obj) else: log.warning("Patch for unknown path: %s", path) - elif msg.event == "indirect/patch": - path = msg.data - log.debug("Received indirect/patch event for %s", path) - target = StreamingUpdateProcessor._parse_path(path) - if target is not None: - store.upsert(target.kind, requester.get_one(target.kind, target.key)) - else: - log.warning("Indirect patch for unknown path: %s", path) - elif msg.event == "indirect/put": - log.debug("Received indirect/put event") - store.init(requester.get_all_data()) - return True elif msg.event == 'delete': payload = json.loads(msg.data) path = payload['path'] diff --git a/ldclient/util.py b/ldclient/util.py index 189247db..3880c330 100644 --- a/ldclient/util.py +++ b/ldclient/util.py @@ -89,15 +89,28 @@ def is_http_error_recoverable(status): return True # all other errors are recoverable +def http_error_description(status): + return "HTTP error %d%s" % (status, " (invalid SDK key)" if (status == 401 or status == 403) else "") + + def http_error_message(status, context, retryable_message = "will retry"): - return "Received HTTP error %d%s for %s - %s" % ( - status, - " (invalid SDK key)" if (status == 401 or status == 403) else "", + return "Received %s for %s - %s" % ( + http_error_description(status), context, retryable_message if is_http_error_recoverable(status) else "giving up permanently" ) +def check_if_error_is_recoverable_and_log(error_context, status_code, error_desc, recoverable_message): + if status_code and (error_desc is None): + error_desc = http_error_description(status_code) + if status_code and not is_http_error_recoverable(status_code): + log.error("Error %s (giving up permanently): %s" % (error_context, error_desc)) + return False + log.warning("Error %s (%s): %s" % (error_context, recoverable_message, error_desc)) + return True + + def stringify_attrs(attrdict, attrs): if attrdict is None: return None diff --git a/testing/stub_util.py b/testing/stub_util.py index a5aada7d..a5bd6b9f 100644 --- a/testing/stub_util.py +++ b/testing/stub_util.py @@ -67,9 +67,6 @@ def get_all_data(self): raise self.exception return self.all_data - def get_one(self, kind, key): - pass - class MockResponse(object): def __init__(self, status, headers): self._status = status diff --git a/testing/test_feature_requester.py b/testing/test_feature_requester.py index 3964ad10..10f8d11e 100644 --- a/testing/test_feature_requester.py +++ b/testing/test_feature_requester.py @@ -102,82 +102,6 @@ def test_get_all_data_can_use_cached_data(): req = server.require_request() assert req.headers['If-None-Match'] == etag2 -def test_get_one_flag_returns_data(): - with start_server() as server: - config = Config(sdk_key = 'sdk-key', base_uri = server.uri) - fr = FeatureRequesterImpl(config) - key = 'flag1' - flag_data = { 'key': key } - server.for_path('/sdk/latest-flags/' + key, JsonResponse(flag_data)) - result = fr.get_one(FEATURES, key) - assert result == flag_data - -def test_get_one_flag_sends_headers(): - with start_server() as server: - config = Config(sdk_key = 'sdk-key', base_uri = server.uri) - fr = FeatureRequesterImpl(config) - key = 'flag1' - flag_data = { 'key': key } - server.for_path('/sdk/latest-flags/' + key, JsonResponse(flag_data)) - fr.get_one(FEATURES, key) - req = server.require_request() - assert req.headers['Authorization'] == 'sdk-key' - assert req.headers['User-Agent'] == 'PythonClient/' + VERSION - assert req.headers.get('X-LaunchDarkly-Wrapper') is None - -def test_get_one_flag_sends_wrapper_header(): - with start_server() as server: - config = Config(sdk_key = 'sdk-key', base_uri = server.uri, - wrapper_name = 'Flask', wrapper_version = '0.1.0') - fr = FeatureRequesterImpl(config) - key = 'flag1' - flag_data = { 'key': key } - server.for_path('/sdk/latest-flags/' + key, JsonResponse(flag_data)) - fr.get_one(FEATURES, key) - req = server.require_request() - assert req.headers.get('X-LaunchDarkly-Wrapper') == 'Flask/0.1.0' - -def test_get_one_flag_sends_wrapper_header_without_version(): - with start_server() as server: - config = Config(sdk_key = 'sdk-key', base_uri = server.uri, - wrapper_name = 'Flask') - fr = FeatureRequesterImpl(config) - key = 'flag1' - flag_data = { 'key': key } - server.for_path('/sdk/latest-flags/' + key, JsonResponse(flag_data)) - fr.get_one(FEATURES, key) - req = server.require_request() - assert req.headers.get('X-LaunchDarkly-Wrapper') == 'Flask' - -def test_get_one_flag_throws_on_error(): - with start_server() as server: - config = Config(sdk_key = 'sdk-key', base_uri = server.uri) - fr = FeatureRequesterImpl(config) - with pytest.raises(UnsuccessfulResponseException) as e: - fr.get_one(FEATURES, 'didnt-set-up-a-response-for-this-flag') - assert e.value.status == 404 - -def test_get_one_flag_does_not_use_etags(): - with start_server() as server: - config = Config(sdk_key = 'sdk-key', base_uri = server.uri) - fr = FeatureRequesterImpl(config) - - etag = 'my-etag' - key = 'flag1' - flag_data = { 'key': key } - req_path = '/sdk/latest-flags/' + key - server.for_path(req_path, JsonResponse(flag_data, { 'Etag': etag })) - - result = fr.get_one(FEATURES, key) - assert result == flag_data - req = server.require_request() - assert 'If-None-Match' not in req.headers.keys() - - result = fr.get_one(FEATURES, key) - assert result == flag_data - req = server.require_request() - assert 'If-None-Match' not in req.headers.keys() # did not send etag from previous request - def test_can_use_http_proxy_via_environment_var(monkeypatch): with start_server() as server: monkeypatch.setenv('http_proxy', server.uri) diff --git a/testing/test_streaming.py b/testing/test_streaming.py index 75da9ea4..dadac824 100644 --- a/testing/test_streaming.py +++ b/testing/test_streaming.py @@ -30,7 +30,7 @@ def test_request_properties(): config = Config(sdk_key = 'sdk-key', stream_uri = server.uri) server.for_path('/all', stream) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() req = server.await_request() assert req.method == 'GET' @@ -48,7 +48,7 @@ def test_sends_wrapper_header(): wrapper_name = 'Flask', wrapper_version = '0.1.0') server.for_path('/all', stream) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() req = server.await_request() assert req.headers.get('X-LaunchDarkly-Wrapper') == 'Flask/0.1.0' @@ -63,7 +63,7 @@ def test_sends_wrapper_header_without_version(): wrapper_name = 'Flask') server.for_path('/all', stream) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() req = server.await_request() assert req.headers.get('X-LaunchDarkly-Wrapper') == 'Flask' @@ -79,7 +79,7 @@ def test_receives_put_event(): config = Config(sdk_key = 'sdk-key', stream_uri = server.uri) server.for_path('/all', stream) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() ready.wait(start_wait) assert sp.initialized() @@ -99,7 +99,7 @@ def test_receives_patch_events(): config = Config(sdk_key = 'sdk-key', stream_uri = server.uri) server.for_path('/all', stream) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() ready.wait(start_wait) assert sp.initialized() @@ -123,7 +123,7 @@ def test_receives_delete_events(): config = Config(sdk_key = 'sdk-key', stream_uri = server.uri) server.for_path('/all', stream) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() ready.wait(start_wait) assert sp.initialized() @@ -148,7 +148,7 @@ def test_reconnects_if_stream_is_broken(): config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) server.for_path('/all', SequentialHandler(stream1, stream2)) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() server.await_request ready.wait(start_wait) @@ -169,7 +169,7 @@ def test_retries_on_network_error(): config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) server.for_path('/all', two_errors_then_success) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() ready.wait(start_wait) assert sp.initialized() @@ -187,7 +187,7 @@ def test_recoverable_http_error(status): config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) server.for_path('/all', two_errors_then_success) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() ready.wait(start_wait) assert sp.initialized() @@ -204,7 +204,7 @@ def test_unrecoverable_http_error(status): config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) server.for_path('/all', error_then_success) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() ready.wait(5) assert not sp.initialized() @@ -237,7 +237,7 @@ def _verify_http_proxy_is_used(server, config): ready = Event() with stream_content(make_put_event()) as stream: server.for_path(config.stream_base_uri + '/all', stream) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() # For an insecure proxy request, our stub server behaves enough like the real thing to satisfy the # HTTP client, so we should be able to see the request go through. Note that the URI path will @@ -252,7 +252,7 @@ def _verify_https_proxy_is_used(server, config): ready = Event() with stream_content(make_put_event()) as stream: server.for_path(config.stream_base_uri + '/all', stream) - with StreamingUpdateProcessor(config, None, store, ready, None) as sp: + with StreamingUpdateProcessor(config, store, ready, None) as sp: sp.start() # Our simple stub server implementation can't really do HTTPS proxying, so the request will fail, but # it can still record that it *got* the request, which proves that the request went to the proxy. @@ -268,7 +268,7 @@ def test_records_diagnostic_on_stream_init_success(): server.for_path('/all', stream) diag_accum = _DiagnosticAccumulator(1) - with StreamingUpdateProcessor(config, None, store, ready, diag_accum) as sp: + with StreamingUpdateProcessor(config, store, ready, diag_accum) as sp: sp.start() ready.wait(start_wait) recorded_inits = diag_accum.create_event_and_reset(0, 0)['streamInits'] @@ -286,7 +286,7 @@ def test_records_diagnostic_on_stream_init_failure(): server.for_path('/all', error_then_success) diag_accum = _DiagnosticAccumulator(1) - with StreamingUpdateProcessor(config, None, store, ready, diag_accum) as sp: + with StreamingUpdateProcessor(config, store, ready, diag_accum) as sp: sp.start() ready.wait(start_wait) recorded_inits = diag_accum.create_event_and_reset(0, 0)['streamInits']