Skip to content

updated evaluator #558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions splitio/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(self, factory, recorder, labels_enabled=True):
:rtype: Client
"""
ClientBase.__init__(self, factory, recorder, labels_enabled)
self._context_factory = EvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'))
self._context_factory = EvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments'))

def destroy(self):
"""
Expand Down Expand Up @@ -668,7 +668,7 @@ def __init__(self, factory, recorder, labels_enabled=True):
:rtype: Client
"""
ClientBase.__init__(self, factory, recorder, labels_enabled)
self._context_factory = AsyncEvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'))
self._context_factory = AsyncEvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments'))

async def destroy(self):
"""
Expand Down
81 changes: 66 additions & 15 deletions splitio/engine/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from splitio.models.grammar.condition import ConditionType
from splitio.models.grammar.matchers.misc import DependencyMatcher
from splitio.models.grammar.matchers.keys import UserDefinedSegmentMatcher
from splitio.models.grammar.matchers.rule_based_segment import RuleBasedSegmentMatcher
from splitio.optional.loaders import asyncio

CONTROL = 'control'
EvaluationContext = namedtuple('EvaluationContext', ['flags', 'segment_memberships'])
EvaluationContext = namedtuple('EvaluationContext', ['flags', 'segment_memberships', 'segment_rbs_memberships', 'segment_rbs_conditions'])

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,9 +99,10 @@ def _treatment_for_flag(self, flag, key, bucketing, attributes, ctx):

class EvaluationDataFactory:

def __init__(self, split_storage, segment_storage):
def __init__(self, split_storage, segment_storage, rbs_segment_storage):
self._flag_storage = split_storage
self._segment_storage = segment_storage
self._rbs_segment_storage = rbs_segment_storage

def context_for(self, key, feature_names):
"""
Expand All @@ -114,28 +116,50 @@ def context_for(self, key, feature_names):
pending = set(feature_names)
splits = {}
pending_memberships = set()
pending_rbs_memberships = set()
while pending:
fetched = self._flag_storage.fetch_many(list(pending))
features = filter_missing(fetched)
splits.update(features)
pending = set()
for feature in features.values():
cf, cs = get_dependencies(feature)
cf, cs, crbs = get_dependencies(feature)
pending.update(filter(lambda f: f not in splits, cf))
pending_memberships.update(cs)

return EvaluationContext(splits, {
segment: self._segment_storage.segment_contains(segment, key)
for segment in pending_memberships
})

pending_rbs_memberships.update(crbs)

rbs_segment_memberships = {}
rbs_segment_conditions = {}
key_membership = False
segment_memberhsip = False
for rbs_segment in pending_rbs_memberships:
key_membership = key in self._rbs_segment_storage.get(rbs_segment).excluded.get_excluded_keys()
segment_memberhsip = False
for segment_name in self._rbs_segment_storage.get(rbs_segment).excluded.get_excluded_segments():
if self._segment_storage.segment_contains(segment_name, key):
segment_memberhsip = True
break

rbs_segment_memberships.update({rbs_segment: segment_memberhsip or key_membership})
if not (segment_memberhsip or key_membership):
rbs_segment_conditions.update({rbs_segment: [condition for condition in self._rbs_segment_storage.get(rbs_segment).conditions]})

return EvaluationContext(
splits,
{ segment: self._segment_storage.segment_contains(segment, key)
for segment in pending_memberships
},
rbs_segment_memberships,
rbs_segment_conditions
)

class AsyncEvaluationDataFactory:

def __init__(self, split_storage, segment_storage):
def __init__(self, split_storage, segment_storage, rbs_segment_storage):
self._flag_storage = split_storage
self._segment_storage = segment_storage

self._rbs_segment_storage = rbs_segment_storage

async def context_for(self, key, feature_names):
"""
Recursively iterate & fetch all data required to evaluate these flags.
Expand All @@ -148,23 +172,47 @@ async def context_for(self, key, feature_names):
pending = set(feature_names)
splits = {}
pending_memberships = set()
pending_rbs_memberships = set()
while pending:
fetched = await self._flag_storage.fetch_many(list(pending))
features = filter_missing(fetched)
splits.update(features)
pending = set()
for feature in features.values():
cf, cs = get_dependencies(feature)
cf, cs, crbs = get_dependencies(feature)
pending.update(filter(lambda f: f not in splits, cf))
pending_memberships.update(cs)

pending_rbs_memberships.update(crbs)

segment_names = list(pending_memberships)
segment_memberships = await asyncio.gather(*[
self._segment_storage.segment_contains(segment, key)
for segment in segment_names
])

return EvaluationContext(splits, dict(zip(segment_names, segment_memberships)))
rbs_segment_memberships = {}
rbs_segment_conditions = {}
key_membership = False
segment_memberhsip = False
for rbs_segment in pending_rbs_memberships:
rbs_segment_obj = await self._rbs_segment_storage.get(rbs_segment)
key_membership = key in rbs_segment_obj.excluded.get_excluded_keys()
segment_memberhsip = False
for segment_name in rbs_segment_obj.excluded.get_excluded_segments():
if await self._segment_storage.segment_contains(segment_name, key):
segment_memberhsip = True
break

rbs_segment_memberships.update({rbs_segment: segment_memberhsip or key_membership})
if not (segment_memberhsip or key_membership):
rbs_segment_conditions.update({rbs_segment: [condition for condition in rbs_segment_obj.conditions]})

return EvaluationContext(
splits,
dict(zip(segment_names, segment_memberships)),
rbs_segment_memberships,
rbs_segment_conditions
)


def get_dependencies(feature):
Expand All @@ -173,14 +221,17 @@ def get_dependencies(feature):
"""
feature_names = []
segment_names = []
rbs_segment_names = []
for condition in feature.conditions:
for matcher in condition.matchers:
if isinstance(matcher,RuleBasedSegmentMatcher):
rbs_segment_names.append(matcher._rbs_segment_name)
if isinstance(matcher,UserDefinedSegmentMatcher):
segment_names.append(matcher._segment_name)
elif isinstance(matcher, DependencyMatcher):
feature_names.append(matcher._split_name)

return feature_names, segment_names
return feature_names, segment_names, rbs_segment_names

def filter_missing(features):
return {k: v for (k, v) in features.items() if v is not None}
Loading