Skip to content

Commit

Permalink
prepare 6.13.1 release (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
LaunchDarklyCI authored Jul 13, 2020
1 parent 3b6f6c1 commit 6d4ee4b
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 177 deletions.
11 changes: 6 additions & 5 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,18 @@ def _make_update_processor(self, config, store, ready, diagnostic_accumulator):
if config.offline or config.use_ldd:
return NullUpdateProcessor(config, store, ready)

if config.stream:
return StreamingUpdateProcessor(config, store, ready, diagnostic_accumulator)

log.info("Disabling streaming API")
log.warning("You should only disable the streaming API if instructed to do so by LaunchDarkly support")

if config.feature_requester_class:
feature_requester = config.feature_requester_class(config)
else:
feature_requester = FeatureRequesterImpl(config)
""" :type: FeatureRequester """

if config.stream:
return StreamingUpdateProcessor(config, feature_requester, store, ready, diagnostic_accumulator)

log.info("Disabling streaming API")
log.warning("You should only disable the streaming API if instructed to do so by LaunchDarkly support")
return PollingUpdateProcessor(config, feature_requester, store, ready)

def get_sdk_key(self):
Expand Down
99 changes: 63 additions & 36 deletions ldclient/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
from ldclient.interfaces import EventProcessor
from ldclient.repeating_timer import RepeatingTimer
from ldclient.util import UnsuccessfulResponseException
from ldclient.util import _headers, _retryable_statuses
from ldclient.util import log
from ldclient.util import http_error_message, is_http_error_recoverable, stringify_attrs, throw_if_unsuccessful_response
from ldclient.util import check_if_error_is_recoverable_and_log, is_http_error_recoverable, stringify_attrs, throw_if_unsuccessful_response, _headers
from ldclient.diagnostics import create_diagnostic_init

__MAX_FLUSH_THREADS__ = 5
Expand Down Expand Up @@ -141,18 +140,6 @@ def _get_userkey(self, event):
return str(event['user'].get('key'))


class _EventRetry(urllib3.Retry):
def __init__(self):
urllib3.Retry.__init__(self, total=1,
method_whitelist=False, # Enable retry on POST
status_forcelist=_retryable_statuses,
raise_on_status=False)

# Override backoff time to be flat 1 second
def get_backoff_time(self):
return 1


class EventPayloadSendTask(object):
def __init__(self, http, config, formatter, payload, response_fn):
self._http = http
Expand All @@ -175,16 +162,17 @@ def _do_send(self, output_events):
try:
json_body = json.dumps(output_events)
log.debug('Sending events payload: ' + json_body)
hdrs = _headers(self._config)
hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__)
hdrs['X-LaunchDarkly-Payload-ID'] = str(uuid.uuid4())
uri = self._config.events_uri
r = self._http.request('POST', uri,
headers=hdrs,
timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout),
body=json_body,
retries=_EventRetry())
self._response_fn(r)
payload_id = str(uuid.uuid4())
r = _post_events_with_retry(
self._http,
self._config,
self._config.events_uri,
payload_id,
json_body,
"%d events" % len(self._payload.events)
)
if r:
self._response_fn(r)
return r
except Exception as e:
log.warning(
Expand All @@ -202,13 +190,14 @@ def run(self):
try:
json_body = json.dumps(self._event_body)
log.debug('Sending diagnostic event: ' + json_body)
hdrs = _headers(self._config)
uri = self._config.events_base_uri + '/diagnostic'
r = self._http.request('POST', uri,
headers=hdrs,
timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout),
body=json_body,
retries=1)
_post_events_with_retry(
self._http,
self._config,
self._config.events_base_uri + '/diagnostic',
None,
json_body,
"diagnostic event"
)
except Exception as e:
log.warning(
'Unhandled exception in event processor. Diagnostic event was not sent. [%s]', e)
Expand Down Expand Up @@ -381,11 +370,9 @@ def _handle_response(self, r):
if server_date is not None:
timestamp = int(time.mktime(server_date) * 1000)
self._last_known_past_time = timestamp
if r.status > 299:
log.error(http_error_message(r.status, "event delivery", "some events were dropped"))
if not is_http_error_recoverable(r.status):
self._disabled = True
return
if r.status > 299 and not is_http_error_recoverable(r.status):
self._disabled = True
return

def _send_and_reset_diagnostics(self):
if self._diagnostic_accumulator is not None:
Expand Down Expand Up @@ -472,3 +459,43 @@ def __enter__(self):

def __exit__(self, type, value, traceback):
self.stop()


def _post_events_with_retry(
http_client,
config,
uri,
payload_id,
body,
events_description
):
hdrs = _headers(config)
hdrs['Content-Type'] = 'application/json'
if payload_id:
hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__)
hdrs['X-LaunchDarkly-Payload-ID'] = payload_id
can_retry = True
context = "posting %s" % events_description
while True:
next_action_message = "will retry" if can_retry else "some events were dropped"
try:
r = http_client.request(
'POST',
uri,
headers=hdrs,
body=body,
timeout=urllib3.Timeout(connect=config.connect_timeout, read=config.read_timeout),
retries=0
)
if r.status < 300:
return r
recoverable = check_if_error_is_recoverable_and_log(context, r.status, None, next_action_message)
if not recoverable:
return r
except Exception as e:
check_if_error_is_recoverable_and_log(context, None, str(e), next_action_message)
if not can_retry:
return None
can_retry = False
# fixed delay of 1 second for event retries
time.sleep(1)
29 changes: 12 additions & 17 deletions ldclient/feature_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,33 @@ def __init__(self, config):
self._cache = dict()
self._http = _http_factory(config).create_pool_manager(1, config.base_uri)
self._config = config
self._poll_uri = config.base_uri + LATEST_ALL_URI

def get_all_data(self):
all_data = self._do_request(self._config.base_uri + LATEST_ALL_URI, True)
return {
FEATURES: all_data['flags'],
SEGMENTS: all_data['segments']
}

def get_one(self, kind, key):
return self._do_request(self._config.base_uri + kind.request_api_path + '/' + key, False)

def _do_request(self, uri, allow_cache):
uri = self._poll_uri
hdrs = _headers(self._config)
if allow_cache:
cache_entry = self._cache.get(uri)
if cache_entry is not None:
hdrs['If-None-Match'] = cache_entry.etag
cache_entry = self._cache.get(uri)
if cache_entry is not None:
hdrs['If-None-Match'] = cache_entry.etag
r = self._http.request('GET', uri,
headers=hdrs,
timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout),
retries=1)
throw_if_unsuccessful_response(r)
if r.status == 304 and allow_cache and cache_entry is not None:
if r.status == 304 and cache_entry is not None:
data = cache_entry.data
etag = cache_entry.etag
from_cache = True
else:
data = json.loads(r.data.decode('UTF-8'))
etag = r.getheader('ETag')
from_cache = False
if allow_cache and etag is not None:
if etag is not None:
self._cache[uri] = CacheEntry(data=data, etag=etag)
log.debug("%s response status:[%d] From cache? [%s] ETag:[%s]",
uri, r.status, from_cache, etag)
return data

return {
FEATURES: data['flags'],
SEGMENTS: data['segments']
}
7 changes: 0 additions & 7 deletions ldclient/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,6 @@ def get_all(self):
"""
pass

def get_one(self, kind, key):
"""
Gets one Feature flag
:return:
"""
pass


class DiagnosticDescription(object):
"""
Expand Down
19 changes: 3 additions & 16 deletions ldclient/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@


class StreamingUpdateProcessor(Thread, UpdateProcessor):
def __init__(self, config, requester, store, ready, diagnostic_accumulator):
def __init__(self, config, store, ready, diagnostic_accumulator):
Thread.__init__(self)
self.daemon = True
self._uri = config.stream_base_uri + STREAM_ALL_PATH
self._config = config
self._requester = requester
self._store = store
self._running = False
self._ready = ready
Expand Down Expand Up @@ -77,7 +76,7 @@ def run(self):
if not self._running:
break
self._retry_delay.set_good_since(time.time())
message_ok = self.process_message(self._store, self._requester, msg)
message_ok = self.process_message(self._store, msg)
if message_ok:
self._record_stream_init(False)
self._es_started = None
Expand Down Expand Up @@ -122,7 +121,7 @@ def initialized(self):

# Returns True if we initialized the feature store
@staticmethod
def process_message(store, requester, msg):
def process_message(store, msg):
if msg.event == 'put':
all_data = json.loads(msg.data)
init_data = {
Expand All @@ -143,18 +142,6 @@ def process_message(store, requester, msg):
store.upsert(target.kind, obj)
else:
log.warning("Patch for unknown path: %s", path)
elif msg.event == "indirect/patch":
path = msg.data
log.debug("Received indirect/patch event for %s", path)
target = StreamingUpdateProcessor._parse_path(path)
if target is not None:
store.upsert(target.kind, requester.get_one(target.kind, target.key))
else:
log.warning("Indirect patch for unknown path: %s", path)
elif msg.event == "indirect/put":
log.debug("Received indirect/put event")
store.init(requester.get_all_data())
return True
elif msg.event == 'delete':
payload = json.loads(msg.data)
path = payload['path']
Expand Down
19 changes: 16 additions & 3 deletions ldclient/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,28 @@ def is_http_error_recoverable(status):
return True # all other errors are recoverable


def http_error_description(status):
return "HTTP error %d%s" % (status, " (invalid SDK key)" if (status == 401 or status == 403) else "")


def http_error_message(status, context, retryable_message = "will retry"):
return "Received HTTP error %d%s for %s - %s" % (
status,
" (invalid SDK key)" if (status == 401 or status == 403) else "",
return "Received %s for %s - %s" % (
http_error_description(status),
context,
retryable_message if is_http_error_recoverable(status) else "giving up permanently"
)


def check_if_error_is_recoverable_and_log(error_context, status_code, error_desc, recoverable_message):
if status_code and (error_desc is None):
error_desc = http_error_description(status_code)
if status_code and not is_http_error_recoverable(status_code):
log.error("Error %s (giving up permanently): %s" % (error_context, error_desc))
return False
log.warning("Error %s (%s): %s" % (error_context, recoverable_message, error_desc))
return True


def stringify_attrs(attrdict, attrs):
if attrdict is None:
return None
Expand Down
3 changes: 0 additions & 3 deletions testing/stub_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ def get_all_data(self):
raise self.exception
return self.all_data

def get_one(self, kind, key):
pass

class MockResponse(object):
def __init__(self, status, headers):
self._status = status
Expand Down
Loading

0 comments on commit 6d4ee4b

Please sign in to comment.