Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions UnleashClient/connectors/streaming_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,46 @@ def _run(self):
)

# Initial hydration happens in the stream.
for event in self._client.events:
for item in self._client.all:
if self._stop.is_set():
LOGGER.debug("SSE stream stopped by user request")
break
if not event.event:
continue

if event.event in ("unleash-connected", "unleash-updated"):
try:
self.engine.take_state(event.data)
self.cache.set(FEATURES_URL, self.engine.get_state())
if hasattr(item, "event") and hasattr(item, "data"):
if not item.event:
continue

if event.event == "unleash-connected" and self.ready_callback:
try:
self.ready_callback()
except Exception:
LOGGER.debug("Ready callback failed", exc_info=True)
except Exception:
LOGGER.error("Error applying streaming state", exc_info=True)
self.load_features()
else:
LOGGER.debug("Ignoring SSE event type: %s", event.event)
if item.event in ("unleash-connected", "unleash-updated"):
try:
self.engine.take_state(item.data)
self.cache.set(FEATURES_URL, self.engine.get_state())

LOGGER.debug("SSE stream ended")
if (
item.event == "unleash-connected"
and self.ready_callback
):
try:
self.ready_callback()
except Exception:
LOGGER.debug("Ready callback failed", exc_info=True)
except Exception:
LOGGER.error(
"Error applying streaming state", exc_info=True
)
self.load_features()
else:
LOGGER.warning("Ignoring SSE event type: %s", item.event)
elif hasattr(item, "error"):
if item.error is None:
if not self._stop.is_set():
LOGGER.info(
"SSE stream ended - server closed connection gracefully"
)
else:
if not self._stop.is_set():
LOGGER.warning(
"SSE stream error: %s - will retry", item.error
)
except Exception as exc:
LOGGER.warning("Streaming connection failed: %s", exc)
self.load_features()
Expand Down
Loading