Skip to content

Polish #581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions splitio/engine/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,9 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
label = Label.KILLED
_treatment = feature.default_treatment
else:
if feature.prerequisites is not None:
prerequisites_matcher = PrerequisitesMatcher(feature.prerequisites)
if not prerequisites_matcher.match(key, attrs, {
'evaluator': self,
'bucketing_key': bucketing,
'ec': ctx}):
label = Label.PREREQUISITES_NOT_MET
_treatment = feature.default_treatment
label, _treatment = self._check_prerequisites(feature, bucketing, key, attrs, ctx, label, _treatment)
label, _treatment = self._get_treatment(feature, bucketing, key, attrs, ctx, label, _treatment)

if _treatment == CONTROL:
treatment, label = self._treatment_for_flag(feature, key, bucketing, attrs, ctx)
if treatment is None:
label = Label.NO_CONDITION_MATCHED
_treatment = feature.default_treatment
else:
_treatment = treatment

return {
'treatment': _treatment,
'configurations': feature.get_configurations_for(_treatment) if feature else None,
Expand All @@ -84,6 +70,30 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
'impressions_disabled': feature.impressions_disabled if feature else None
}

def _get_treatment(self, feature, bucketing, key, attrs, ctx, label, _treatment):
if _treatment == CONTROL:
treatment, label = self._treatment_for_flag(feature, key, bucketing, attrs, ctx)
if treatment is None:
label = Label.NO_CONDITION_MATCHED
_treatment = feature.default_treatment
else:
_treatment = treatment

return label, _treatment

def _check_prerequisites(self, feature, bucketing, key, attrs, ctx, label, _treatment):
if feature.prerequisites is not None:
prerequisites_matcher = PrerequisitesMatcher(feature.prerequisites)
if not prerequisites_matcher.match(key, attrs, {
'evaluator': self,
'bucketing_key': bucketing,
'ec': ctx}):
label = Label.PREREQUISITES_NOT_MET
_treatment = feature.default_treatment

return label, _treatment


def _treatment_for_flag(self, flag, key, bucketing, attributes, ctx):
"""
...
Expand Down
8 changes: 3 additions & 5 deletions splitio/models/grammar/matchers/rule_based_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ def _match_dep_rb_segments(self, excluded_rb_segments, key, attributes, context)
if key in excluded_segment.excluded.get_excluded_keys():
return False

if self._match_dep_rb_segments(excluded_segment.excluded.get_excluded_segments(), key, attributes, context):
if self._match_dep_rb_segments(excluded_segment.excluded.get_excluded_segments(), key, attributes, context) \
or self._match_conditions(excluded_segment.conditions, key, attributes, context):
return True

if self._match_conditions(excluded_segment.conditions, key, attributes, context):
return True


return False
43 changes: 29 additions & 14 deletions splitio/push/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class CompressionMode(Enum):
class WorkerBase(object, metaclass=abc.ABCMeta):
"""Worker template."""

_fetching_segment = "Fetching new segment {segment_name}"

@abc.abstractmethod
def is_running(self):
"""Return whether the working is running."""
Expand Down Expand Up @@ -226,27 +228,30 @@ def _apply_iff_if_needed(self, event):
segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number)
for segment_name in segment_list:
if self._segment_storage.get(segment_name) is None:
_LOGGER.debug('Fetching new segment %s', segment_name)
_LOGGER.debug(self._fetching_segment.format(segment_name=segment_name))
self._segment_handler(segment_name, event.change_number)

referenced_rbs = self._get_referenced_rbs(new_feature_flag)
if len(referenced_rbs) > 0 and not self._rule_based_segment_storage.contains(referenced_rbs):
_LOGGER.debug('Fetching new rule based segment(s) %s', referenced_rbs)
self._handler(None, event.change_number)
self._fetch_rbs_segment_if_needed(referenced_rbs, event)
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
else:
new_rbs = rbs_from_raw(json.loads(self._get_object_definition(event)))
segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, [new_rbs], event.change_number)
for segment_name in segment_list:
if self._segment_storage.get(segment_name) is None:
_LOGGER.debug('Fetching new segment %s', segment_name)
_LOGGER.debug(self._fetching_segment.format(segment_name=segment_name))
self._segment_handler(segment_name, event.change_number)
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.RBS_UPDATE)
return True

except Exception as e:
raise SplitStorageException(e)

def _fetch_rbs_segment_if_needed(self, referenced_rbs, event):
if len(referenced_rbs) > 0 and not self._rule_based_segment_storage.contains(referenced_rbs):
_LOGGER.debug('Fetching new rule based segment(s) %s', referenced_rbs)
self._handler(None, event.change_number)

def _check_instant_ff_update(self, event):
if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == self._feature_flag_storage.get_change_number():
return True
Expand All @@ -264,16 +269,15 @@ def _run(self):
break
if event == self._centinel:
continue

_LOGGER.debug('Processing feature flag update %d', event.change_number)
try:
if self._apply_iff_if_needed(event):
continue

till = None
rbs_till = None
if event.update_type == UpdateType.SPLIT_UPDATE:
till = event.change_number
else:
rbs_till = event.change_number
till, rbs_till = self._check_update_type(till, rbs_till, event)
sync_result = self._handler(till, rbs_till)
if not sync_result.success and sync_result.error_code is not None and sync_result.error_code == 414:
_LOGGER.error("URI too long exception caught, sync failed")
Expand All @@ -288,6 +292,14 @@ def _run(self):
_LOGGER.error('Exception raised in feature flag synchronization')
_LOGGER.debug('Exception information: ', exc_info=True)

def _check_update_type(self, till, rbs_till, event):
if event.update_type == UpdateType.SPLIT_UPDATE:
till = event.change_number
else:
rbs_till = event.change_number

return till, rbs_till

def start(self):
"""Start worker."""
if self.is_running():
Expand Down Expand Up @@ -354,27 +366,30 @@ async def _apply_iff_if_needed(self, event):
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number)
for segment_name in segment_list:
if await self._segment_storage.get(segment_name) is None:
_LOGGER.debug('Fetching new segment %s', segment_name)
_LOGGER.debug(self._fetching_segment.format(segment_name=segment_name))
await self._segment_handler(segment_name, event.change_number)

referenced_rbs = self._get_referenced_rbs(new_feature_flag)
if len(referenced_rbs) > 0 and not await self._rule_based_segment_storage.contains(referenced_rbs):
await self._handler(None, event.change_number)

await self._fetch_rbs_segment_if_needed(referenced_rbs, event)
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
else:
new_rbs = rbs_from_raw(json.loads(self._get_object_definition(event)))
segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, [new_rbs], event.change_number)
for segment_name in segment_list:
if await self._segment_storage.get(segment_name) is None:
_LOGGER.debug('Fetching new segment %s', segment_name)
_LOGGER.debug(self._fetching_segment.format(segment_name=segment_name))
await self._segment_handler(segment_name, event.change_number)
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.RBS_UPDATE)
return True

except Exception as e:
raise SplitStorageException(e)

async def _fetch_rbs_segment_if_needed(self, referenced_rbs, event):
if len(referenced_rbs) > 0 and not await self._rule_based_segment_storage.contains(referenced_rbs):
_LOGGER.debug('Fetching new rule based segment(s) %s', referenced_rbs)
await self._handler(None, event.change_number)

async def _check_instant_ff_update(self, event):
if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == await self._feature_flag_storage.get_change_number():
return True
Expand Down
14 changes: 10 additions & 4 deletions splitio/sync/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ def _get_config_sets(self):

return ','.join(self._feature_flag_storage.flag_set_filter.sorted_flag_sets)

def _check_exit_conditions(self, till, rbs_till, change_number, rbs_change_number):
return (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number)

def _check_return_conditions(self, feature_flag_changes):
return feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']

class SplitSynchronizer(SplitSynchronizerBase):
"""Feature Flag changes synchronizer."""

Expand Down Expand Up @@ -119,7 +125,7 @@ def _fetch_until(self, fetch_options, till=None, rbs_till=None):
if rbs_change_number is None:
rbs_change_number = -1

if (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number):
if self._check_exit_conditions(till, rbs_till, change_number, rbs_change_number):
# the passed till is less than change_number, no need to perform updates
return change_number, rbs_change_number, segment_list

Expand All @@ -142,7 +148,7 @@ def _fetch_until(self, fetch_options, till=None, rbs_till=None):
segment_list.update(update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'], self._api.clear_storage))
segment_list.update(rbs_segment_list)

if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
if self._check_return_conditions(feature_flag_changes):
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list

def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None):
Expand Down Expand Up @@ -278,7 +284,7 @@ async def _fetch_until(self, fetch_options, till=None, rbs_till=None):
if rbs_change_number is None:
rbs_change_number = -1

if (till is not None and till < change_number) or (rbs_till is not None and rbs_till < rbs_change_number):
if self._check_exit_conditions(till, rbs_till, change_number, rbs_change_number):
# the passed till is less than change_number, no need to perform updates
return change_number, rbs_change_number, segment_list

Expand All @@ -301,7 +307,7 @@ async def _fetch_until(self, fetch_options, till=None, rbs_till=None):
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes.get('ff')['t'], self._api.clear_storage)
segment_list.update(rbs_segment_list)

if feature_flag_changes.get('ff')['t'] == feature_flag_changes.get('ff')['s'] and feature_flag_changes.get('rbs')['t'] == feature_flag_changes.get('rbs')['s']:
if self._check_return_conditions(feature_flag_changes):
return feature_flag_changes.get('ff')['t'], feature_flag_changes.get('rbs')['t'], segment_list

async def _attempt_feature_flag_sync(self, fetch_options, till=None, rbs_till=None):
Expand Down