From 3396b5fa85a88f7a919b42a9eafba0b8410e6e6c Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 12 Mar 2025 12:37:14 -0300 Subject: [PATCH 1/6] Updated SSE classes --- splitio/models/telemetry.py | 1 + splitio/push/parser.py | 60 ++++++++++- splitio/push/processor.py | 10 +- splitio/push/workers.py | 121 +++++++++++++++------ splitio/spec.py | 2 +- tests/push/test_split_worker.py | 179 ++++++++++++++++++++++++-------- 6 files changed, 292 insertions(+), 81 deletions(-) diff --git a/splitio/models/telemetry.py b/splitio/models/telemetry.py index f734cf67..c9715da4 100644 --- a/splitio/models/telemetry.py +++ b/splitio/models/telemetry.py @@ -140,6 +140,7 @@ class OperationMode(Enum): class UpdateFromSSE(Enum): """Update from sse constants""" SPLIT_UPDATE = 'sp' + RBS_UPDATE = 'rbs' def get_latency_bucket_index(micros): """ diff --git a/splitio/push/parser.py b/splitio/push/parser.py index 098221e1..79b410e3 100644 --- a/splitio/push/parser.py +++ b/splitio/push/parser.py @@ -28,6 +28,7 @@ class UpdateType(Enum): SPLIT_UPDATE = 'SPLIT_UPDATE' SPLIT_KILL = 'SPLIT_KILL' SEGMENT_UPDATE = 'SEGMENT_UPDATE' + RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE' class ControlType(Enum): @@ -329,7 +330,7 @@ def __init__(self, channel, timestamp, change_number, previous_change_number, fe """Class constructor.""" BaseUpdate.__init__(self, channel, timestamp, change_number) self._previous_change_number = previous_change_number - self._feature_flag_definition = feature_flag_definition + self._object_definition = feature_flag_definition self._compression = compression @property @@ -352,13 +353,13 @@ def previous_change_number(self): # pylint:disable=no-self-use return self._previous_change_number @property - def feature_flag_definition(self): # pylint:disable=no-self-use + def object_definition(self): # pylint:disable=no-self-use """ Return feature flag definition :returns: The new feature flag definition :rtype: str """ - return self._feature_flag_definition + return self._object_definition @property def compression(self): # pylint:disable=no-self-use @@ -451,6 +452,56 @@ def __str__(self): """Return string representation.""" return "SegmentChange - changeNumber=%d, name=%s" % (self.change_number, self.segment_name) +class RBSChangeUpdate(BaseUpdate): + """rbs Change notification.""" + + def __init__(self, channel, timestamp, change_number, previous_change_number, rbs_definition, compression): + """Class constructor.""" + BaseUpdate.__init__(self, channel, timestamp, change_number) + self._previous_change_number = previous_change_number + self._object_definition = rbs_definition + self._compression = compression + + @property + def update_type(self): # pylint:disable=no-self-use + """ + Return the message type. + + :returns: The type of this parsed Update. + :rtype: UpdateType + """ + return UpdateType.RB_SEGMENT_UPDATE + + @property + def previous_change_number(self): # pylint:disable=no-self-use + """ + Return previous change number + :returns: The previous change number + :rtype: int + """ + return self._previous_change_number + + @property + def object_definition(self): # pylint:disable=no-self-use + """ + Return rbs definition + :returns: The new rbs definition + :rtype: str + """ + return self._object_definition + + @property + def compression(self): # pylint:disable=no-self-use + """ + Return previous compression type + :returns: The compression type + :rtype: int + """ + return self._compression + + def __str__(self): + """Return string representation.""" + return "RBSChange - changeNumber=%d" % (self.change_number) class ControlMessage(BaseMessage): """Control notification.""" @@ -503,6 +554,9 @@ def _parse_update(channel, timestamp, data): if update_type == UpdateType.SPLIT_UPDATE and change_number is not None: return SplitChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c')) + if update_type == UpdateType.RB_SEGMENT_UPDATE and change_number is not None: + return RBSChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c')) + elif update_type == UpdateType.SPLIT_KILL and change_number is not None: return SplitKillUpdate(channel, timestamp, change_number, data['splitName'], data['defaultTreatment']) diff --git a/splitio/push/processor.py b/splitio/push/processor.py index e8de95c8..41d796c7 100644 --- a/splitio/push/processor.py +++ b/splitio/push/processor.py @@ -35,12 +35,13 @@ def __init__(self, synchronizer, telemetry_runtime_producer): self._feature_flag_queue = Queue() self._segments_queue = Queue() self._synchronizer = synchronizer - self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer) + self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer, synchronizer.split_sync.rule_based_segment_storage) self._segments_worker = SegmentWorker(synchronizer.synchronize_segment, self._segments_queue) self._handlers = { UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update, UpdateType.SPLIT_KILL: self._handle_feature_flag_kill, - UpdateType.SEGMENT_UPDATE: self._handle_segment_change + UpdateType.SEGMENT_UPDATE: self._handle_segment_change, + UpdateType.RB_SEGMENT_UPDATE: self._handle_feature_flag_update } def _handle_feature_flag_update(self, event): @@ -119,12 +120,13 @@ def __init__(self, synchronizer, telemetry_runtime_producer): self._feature_flag_queue = asyncio.Queue() self._segments_queue = asyncio.Queue() self._synchronizer = synchronizer - self._feature_flag_worker = SplitWorkerAsync(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer) + self._feature_flag_worker = SplitWorkerAsync(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer, synchronizer.split_sync.rule_based_segment_storage) self._segments_worker = SegmentWorkerAsync(synchronizer.synchronize_segment, self._segments_queue) self._handlers = { UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update, UpdateType.SPLIT_KILL: self._handle_feature_flag_kill, - UpdateType.SEGMENT_UPDATE: self._handle_segment_change + UpdateType.SEGMENT_UPDATE: self._handle_segment_change, + UpdateType.RB_SEGMENT_UPDATE: self._handle_feature_flag_update } async def _handle_feature_flag_update(self, event): diff --git a/splitio/push/workers.py b/splitio/push/workers.py index 5161d15d..e4888f36 100644 --- a/splitio/push/workers.py +++ b/splitio/push/workers.py @@ -9,11 +9,13 @@ from enum import Enum from splitio.models.splits import from_raw +from splitio.models.rule_based_segments import from_raw as rbs_from_raw from splitio.models.telemetry import UpdateFromSSE from splitio.push import SplitStorageException from splitio.push.parser import UpdateType from splitio.optional.loaders import asyncio -from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async +from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async, \ + update_rule_based_segment_storage, update_rule_based_segment_storage_async _LOGGER = logging.getLogger(__name__) @@ -25,9 +27,9 @@ class CompressionMode(Enum): ZLIB_COMPRESSION = 2 _compression_handlers = { - CompressionMode.NO_COMPRESSION: lambda event: base64.b64decode(event.feature_flag_definition), - CompressionMode.GZIP_COMPRESSION: lambda event: gzip.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'), - CompressionMode.ZLIB_COMPRESSION: lambda event: zlib.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'), + CompressionMode.NO_COMPRESSION: lambda event: base64.b64decode(event.object_definition), + CompressionMode.GZIP_COMPRESSION: lambda event: gzip.decompress(base64.b64decode(event.object_definition)).decode('utf-8'), + CompressionMode.ZLIB_COMPRESSION: lambda event: zlib.decompress(base64.b64decode(event.object_definition)).decode('utf-8'), } class WorkerBase(object, metaclass=abc.ABCMeta): @@ -45,10 +47,19 @@ def start(self): def stop(self): """Stop worker.""" - def _get_feature_flag_definition(self, event): - """return feature flag definition in event.""" + def _get_object_definition(self, event): + """return feature flag or rule based segment definition in event.""" cm = CompressionMode(event.compression) # will throw if the number is not defined in compression mode return _compression_handlers[cm](event) + + def _get_referenced_rbs(self, feature_flag): + referenced_rbs = set() + for condition in feature_flag.conditions: + for matcher in condition.matchers: + raw_matcher = matcher.to_json() + if raw_matcher['matcherType'] == 'IN_RULE_BASED_SEGMENT': + referenced_rbs.add(raw_matcher['userDefinedSegmentMatcherData']['segmentName']) + return referenced_rbs class SegmentWorker(WorkerBase): """Segment Worker for processing updates.""" @@ -173,7 +184,7 @@ class SplitWorker(WorkerBase): _centinel = object() - def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer): + def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer, rule_based_segment_storage): """ Class constructor. @@ -189,6 +200,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q :type segment_storage: splitio.storage.inmemory.InMemorySegmentStorage :param telemetry_runtime_producer: Telemetry runtime producer instance :type telemetry_runtime_producer: splitio.engine.telemetry.TelemetryRuntimeProducer + :param rule_based_segment_storage: Rule based segment Storage. + :type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage """ self._feature_flag_queue = feature_flag_queue self._handler = synchronize_feature_flag @@ -198,6 +211,7 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q self._feature_flag_storage = feature_flag_storage self._segment_storage = segment_storage self._telemetry_runtime_producer = telemetry_runtime_producer + self._rule_based_segment_storage = rule_based_segment_storage def is_running(self): """Return whether the working is running.""" @@ -206,18 +220,30 @@ def is_running(self): def _apply_iff_if_needed(self, event): if not self._check_instant_ff_update(event): return False - try: - new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event))) - segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number) - for segment_name in segment_list: - if self._segment_storage.get(segment_name) is None: - _LOGGER.debug('Fetching new segment %s', segment_name) - self._segment_handler(segment_name, event.change_number) - - self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE) + if event.update_type == UpdateType.SPLIT_UPDATE: + new_feature_flag = from_raw(json.loads(self._get_object_definition(event))) + segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number) + for segment_name in segment_list: + if self._segment_storage.get(segment_name) is None: + _LOGGER.debug('Fetching new segment %s', segment_name) + self._segment_handler(segment_name, event.change_number) + + referenced_rbs = self._get_referenced_rbs(new_feature_flag) + if len(referenced_rbs) > 0 and not self._rule_based_segment_storage.contains(referenced_rbs): + _LOGGER.debug('Fetching new rule based segment(s) %s', referenced_rbs) + self._handler(None, event.change_number) + self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE) + else: + new_rbs = rbs_from_raw(json.loads(self._get_object_definition(event))) + segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, [new_rbs], event.change_number) + for segment_name in segment_list: + if self._segment_storage.get(segment_name) is None: + _LOGGER.debug('Fetching new segment %s', segment_name) + self._segment_handler(segment_name, event.change_number) + self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.RBS_UPDATE) return True - + except Exception as e: raise SplitStorageException(e) @@ -225,6 +251,9 @@ def _check_instant_ff_update(self, event): if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == self._feature_flag_storage.get_change_number(): return True + if event.update_type == UpdateType.RB_SEGMENT_UPDATE and event.compression is not None and event.previous_change_number == self._rule_based_segment_storage.get_change_number(): + return True + return False def _run(self): @@ -239,8 +268,13 @@ def _run(self): try: if self._apply_iff_if_needed(event): continue - - sync_result = self._handler(event.change_number) + till = None + rbs_till = None + if event.update_type == UpdateType.SPLIT_UPDATE: + till = event.change_number + else: + rbs_till = event.change_number + sync_result = self._handler(till, rbs_till) if not sync_result.success and sync_result.error_code is not None and sync_result.error_code == 414: _LOGGER.error("URI too long exception caught, sync failed") @@ -279,7 +313,7 @@ class SplitWorkerAsync(WorkerBase): _centinel = object() - def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer): + def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer, rule_based_segment_storage): """ Class constructor. @@ -295,6 +329,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q :type segment_storage: splitio.storage.inmemory.InMemorySegmentStorage :param telemetry_runtime_producer: Telemetry runtime producer instance :type telemetry_runtime_producer: splitio.engine.telemetry.TelemetryRuntimeProducer + :param rule_based_segment_storage: Rule based segment Storage. + :type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage """ self._feature_flag_queue = feature_flag_queue self._handler = synchronize_feature_flag @@ -303,7 +339,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q self._feature_flag_storage = feature_flag_storage self._segment_storage = segment_storage self._telemetry_runtime_producer = telemetry_runtime_producer - + self._rule_based_segment_storage = rule_based_segment_storage + def is_running(self): """Return whether the working is running.""" return self._running @@ -312,23 +349,39 @@ async def _apply_iff_if_needed(self, event): if not await self._check_instant_ff_update(event): return False try: - new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event))) - segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number) - for segment_name in segment_list: - if await self._segment_storage.get(segment_name) is None: - _LOGGER.debug('Fetching new segment %s', segment_name) - await self._segment_handler(segment_name, event.change_number) - - await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE) + if event.update_type == UpdateType.SPLIT_UPDATE: + new_feature_flag = from_raw(json.loads(self._get_object_definition(event))) + segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number) + for segment_name in segment_list: + if await self._segment_storage.get(segment_name) is None: + _LOGGER.debug('Fetching new segment %s', segment_name) + await self._segment_handler(segment_name, event.change_number) + + referenced_rbs = self._get_referenced_rbs(new_feature_flag) + if len(referenced_rbs) > 0 and not await self._rule_based_segment_storage.contains(referenced_rbs): + await self._handler(None, event.change_number) + + await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE) + else: + new_rbs = rbs_from_raw(json.loads(self._get_object_definition(event))) + segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, [new_rbs], event.change_number) + for segment_name in segment_list: + if await self._segment_storage.get(segment_name) is None: + _LOGGER.debug('Fetching new segment %s', segment_name) + await self._segment_handler(segment_name, event.change_number) + await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.RBS_UPDATE) return True except Exception as e: raise SplitStorageException(e) - async def _check_instant_ff_update(self, event): if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == await self._feature_flag_storage.get_change_number(): return True + + if event.update_type == UpdateType.RB_SEGMENT_UPDATE and event.compression is not None and event.previous_change_number == await self._rule_based_segment_storage.get_change_number(): + return True + return False async def _run(self): @@ -343,7 +396,13 @@ async def _run(self): try: if await self._apply_iff_if_needed(event): continue - await self._handler(event.change_number) + till = None + rbs_till = None + if event.update_type == UpdateType.SPLIT_UPDATE: + till = event.change_number + else: + rbs_till = event.change_number + await self._handler(till, rbs_till) except SplitStorageException as e: # pylint: disable=broad-except _LOGGER.error('Exception Updating Feature Flag') _LOGGER.debug('Exception information: ', exc_info=True) diff --git a/splitio/spec.py b/splitio/spec.py index 1388fcda..cd7588e0 100644 --- a/splitio/spec.py +++ b/splitio/spec.py @@ -1 +1 @@ -SPEC_VERSION = '1.1' +SPEC_VERSION = '1.3' diff --git a/tests/push/test_split_worker.py b/tests/push/test_split_worker.py index d792cada..0d3ac824 100644 --- a/tests/push/test_split_worker.py +++ b/tests/push/test_split_worker.py @@ -1,79 +1,127 @@ """Split Worker tests.""" import time import queue +import base64 import pytest from splitio.api import APIException from splitio.push.workers import SplitWorker, SplitWorkerAsync from splitio.models.notification import SplitChangeNotification from splitio.optional.loaders import asyncio -from splitio.push.parser import SplitChangeUpdate +from splitio.push.parser import SplitChangeUpdate, RBSChangeUpdate from splitio.engine.telemetry import TelemetryStorageProducer, TelemetryStorageProducerAsync from splitio.storage.inmemmory import InMemoryTelemetryStorage, InMemorySplitStorage, InMemorySegmentStorage, \ InMemoryTelemetryStorageAsync, InMemorySplitStorageAsync, InMemorySegmentStorageAsync change_number_received = None - - -def handler_sync(change_number): +rbs = { + "changeNumber": 5, + "name": "sample_rule_based_segment", + "status": "ACTIVE", + "trafficTypeName": "user", + "excluded":{ + "keys":["mauro@split.io","gaston@split.io"], + "segments":[] + }, + "conditions": [ + { + "matcherGroup": { + "combiner": "AND", + "matchers": [ + { + "keySelector": { + "trafficType": "user", + "attribute": "email" + }, + "matcherType": "ENDS_WITH", + "negate": False, + "whitelistMatcherData": { + "whitelist": [ + "@split.io" + ] + } + } + ] + } + } + ] + } + +def handler_sync(change_number, rbs_change_number): global change_number_received + global rbs_change_number_received + change_number_received = change_number + rbs_change_number_received = rbs_change_number return -async def handler_async(change_number): +async def handler_async(change_number, rbs_change_number): global change_number_received + global rbs_change_number_received change_number_received = change_number + rbs_change_number_received = rbs_change_number return class SplitWorkerTests(object): - def test_on_error(self, mocker): - q = queue.Queue() - def handler_sync(change_number): - raise APIException('some') - - split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock()) - split_worker.start() - assert split_worker.is_running() - - q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456789, None, None, None)) - with pytest.raises(Exception): - split_worker._handler() - - assert split_worker.is_running() - assert split_worker._worker.is_alive() - split_worker.stop() - time.sleep(1) - assert not split_worker.is_running() - assert not split_worker._worker.is_alive() - def test_handler(self, mocker): q = queue.Queue() - split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) global change_number_received + global rbs_change_number_received assert not split_worker.is_running() split_worker.start() assert split_worker.is_running() - - # should call the handler - q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456789, None, None, None)) - time.sleep(0.1) - assert change_number_received == 123456789 - + def get_change_number(): return 2345 split_worker._feature_flag_storage.get_change_number = get_change_number + def get_rbs_change_number(): + return 2345 + split_worker._rule_based_segment_storage.get_change_number = get_rbs_change_number + self._feature_flag_added = None self._feature_flag_deleted = None def update(feature_flag_add, feature_flag_delete, change_number): self._feature_flag_added = feature_flag_add - self._feature_flag_deleted = feature_flag_delete + self._feature_flag_deleted = feature_flag_delete split_worker._feature_flag_storage.update = update split_worker._feature_flag_storage.config_flag_sets_used = 0 + self._rbs_added = None + self._rbs_deleted = None + def update(rbs_add, rbs_delete, change_number): + self._rbs_added = rbs_add + self._rbs_deleted = rbs_delete + split_worker._rule_based_segment_storage.update = update + + # should not call the handler + rbs_change_number_received = 0 + rbs1 = str(rbs) + rbs1 = rbs1.replace("'", "\"") + rbs1 = rbs1.replace("False", "false") + encoded = base64.b64encode(bytes(rbs1, "utf-8")) + q.put(RBSChangeUpdate('some', 'RB_SEGMENT_UPDATE', 123456790, 2345, encoded, 0)) + time.sleep(0.1) + assert rbs_change_number_received == 0 + assert self._rbs_added[0].name == "sample_rule_based_segment" + + # should call the handler + q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456789, None, None, None)) + time.sleep(0.1) + assert change_number_received == 123456789 + assert rbs_change_number_received == None + + # should call the handler + q.put(RBSChangeUpdate('some', 'RB_SEGMENT_UPDATE', 123456789, None, None, None)) + time.sleep(0.1) + assert rbs_change_number_received == 123456789 + assert change_number_received == None + + # should call the handler q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 12345, "{}", 1)) time.sleep(0.1) @@ -94,12 +142,32 @@ def update(feature_flag_add, feature_flag_delete, change_number): split_worker.stop() assert not split_worker.is_running() + def test_on_error(self, mocker): + q = queue.Queue() + def handler_sync(change_number): + raise APIException('some') + + split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_worker.start() + assert split_worker.is_running() + + q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456789, None, None, None)) + with pytest.raises(Exception): + split_worker._handler() + + assert split_worker.is_running() + assert split_worker._worker.is_alive() + split_worker.stop() + time.sleep(1) + assert not split_worker.is_running() + assert not split_worker._worker.is_alive() + def test_compression(self, mocker): q = queue.Queue() telemetry_storage = InMemoryTelemetryStorage() telemetry_producer = TelemetryStorageProducer(telemetry_storage) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() - split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), telemetry_runtime_producer) + split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), telemetry_runtime_producer, mocker.Mock()) global change_number_received split_worker.start() def get_change_number(): @@ -148,7 +216,7 @@ def update(feature_flag_add, feature_flag_delete, change_number): def test_edge_cases(self, mocker): q = queue.Queue() - split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_worker = SplitWorker(handler_sync, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) global change_number_received split_worker.start() @@ -201,7 +269,7 @@ def test_fetch_segment(self, mocker): def segment_handler_sync(segment_name, change_number): self.segment_name = segment_name return - split_worker = SplitWorker(handler_sync, segment_handler_sync, q, split_storage, segment_storage, mocker.Mock()) + split_worker = SplitWorker(handler_sync, segment_handler_sync, q, split_storage, segment_storage, mocker.Mock(), mocker.Mock()) split_worker.start() def get_change_number(): @@ -225,7 +293,7 @@ async def test_on_error(self, mocker): def handler_sync(change_number): raise APIException('some') - split_worker = SplitWorkerAsync(handler_async, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_worker = SplitWorkerAsync(handler_async, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) split_worker.start() assert split_worker.is_running() @@ -253,7 +321,7 @@ def _worker_running(self): @pytest.mark.asyncio async def test_handler(self, mocker): q = asyncio.Queue() - split_worker = SplitWorkerAsync(handler_async, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_worker = SplitWorkerAsync(handler_async, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) assert not split_worker.is_running() split_worker.start() @@ -261,7 +329,8 @@ async def test_handler(self, mocker): assert(self._worker_running()) global change_number_received - + global rbs_change_number_received + # should call the handler await q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456789, None, None, None)) await asyncio.sleep(0.1) @@ -271,6 +340,10 @@ async def get_change_number(): return 2345 split_worker._feature_flag_storage.get_change_number = get_change_number + async def get_rbs_change_number(): + return 2345 + split_worker._rule_based_segment_storage.get_change_number = get_rbs_change_number + self.new_change_number = 0 self._feature_flag_added = None self._feature_flag_deleted = None @@ -289,6 +362,24 @@ async def record_update_from_sse(xx): pass split_worker._telemetry_runtime_producer.record_update_from_sse = record_update_from_sse + self._rbs_added = None + self._rbs_deleted = None + async def update_rbs(rbs_add, rbs_delete, change_number): + self._rbs_added = rbs_add + self._rbs_deleted = rbs_delete + split_worker._rule_based_segment_storage.update = update_rbs + + # should not call the handler + rbs_change_number_received = 0 + rbs1 = str(rbs) + rbs1 = rbs1.replace("'", "\"") + rbs1 = rbs1.replace("False", "false") + encoded = base64.b64encode(bytes(rbs1, "utf-8")) + await q.put(RBSChangeUpdate('some', 'RB_SEGMENT_UPDATE', 123456790, 2345, encoded, 0)) + await asyncio.sleep(0.1) + assert rbs_change_number_received == 0 + assert self._rbs_added[0].name == "sample_rule_based_segment" + # should call the handler await q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 12345, "{}", 1)) await asyncio.sleep(0.1) @@ -318,7 +409,7 @@ async def test_compression(self, mocker): telemetry_storage = await InMemoryTelemetryStorageAsync.create() telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() - split_worker = SplitWorkerAsync(handler_async, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), telemetry_runtime_producer) + split_worker = SplitWorkerAsync(handler_async, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), telemetry_runtime_producer, mocker.Mock()) global change_number_received split_worker.start() async def get_change_number(): @@ -343,6 +434,10 @@ async def update(feature_flag_add, feature_flag_delete, change_number): split_worker._feature_flag_storage.update = update split_worker._feature_flag_storage.config_flag_sets_used = 0 + async def contains(rbs): + return False + split_worker._rule_based_segment_storage.contains = contains + # compression 0 await q.put(SplitChangeUpdate('some', 'SPLIT_UPDATE', 123456790, 2345, 'eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ==', 0)) await asyncio.sleep(0.1) @@ -376,7 +471,7 @@ async def update(feature_flag_add, feature_flag_delete, change_number): @pytest.mark.asyncio async def test_edge_cases(self, mocker): q = asyncio.Queue() - split_worker = SplitWorkerAsync(handler_async, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_worker = SplitWorkerAsync(handler_async, mocker.Mock(), q, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) global change_number_received split_worker.start() @@ -434,7 +529,7 @@ async def test_fetch_segment(self, mocker): async def segment_handler_sync(segment_name, change_number): self.segment_name = segment_name return - split_worker = SplitWorkerAsync(handler_async, segment_handler_sync, q, split_storage, segment_storage, mocker.Mock()) + split_worker = SplitWorkerAsync(handler_async, segment_handler_sync, q, split_storage, segment_storage, mocker.Mock(), mocker.Mock()) split_worker.start() async def get_change_number(): From 7cd34ebff80915d5e58e7c5f0eacb194c8c1424b Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 12 Mar 2025 16:56:28 -0300 Subject: [PATCH 2/6] updated redis, pluggable and localjson storages --- splitio/storage/inmemmory.py | 3 +- splitio/storage/pluggable.py | 244 ++++++++++++++++++++++++++++++++++- splitio/storage/redis.py | 236 ++++++++++++++++++++++++++++++++- splitio/sync/split.py | 104 ++++++++++----- 4 files changed, 549 insertions(+), 38 deletions(-) diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index f7af8825..98fc0543 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -109,6 +109,7 @@ def remove_flag_set(self, flag_sets, feature_flag_name, should_filter): class InMemoryRuleBasedSegmentStorage(RuleBasedSegmentsStorage): """InMemory implementation of a feature flag storage base.""" + def __init__(self): """Constructor.""" self._lock = threading.RLock() @@ -192,7 +193,7 @@ def _set_change_number(self, new_change_number): def get_segment_names(self): """ - Retrieve a list of all excluded segments names. + Retrieve a list of all rule based segments names. :return: List of segment names. :rtype: list(str) diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 7f0a5287..1cb7e054 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -5,15 +5,253 @@ import threading from splitio.optional.loaders import asyncio -from splitio.models import splits, segments +from splitio.models import splits, segments, rule_based_segments from splitio.models.impressions import Impression from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, MAX_TAGS,\ MethodLatenciesAsync, MethodExceptionsAsync, TelemetryConfigAsync -from splitio.storage import FlagSetsFilter, SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage +from splitio.storage import FlagSetsFilter, SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage, RuleBasedSegmentsStorage from splitio.util.storage_helper import get_valid_flag_sets, combine_valid_flag_sets _LOGGER = logging.getLogger(__name__) +class PluggableRuleBasedSegmentsStorageBase(RuleBasedSegmentsStorage): + """RedPluggable storage for rule based segments.""" + + _RB_SEGMENT_NAME_LENGTH = 23 + _TILL_LENGTH = 4 + + def __init__(self, pluggable_adapter, prefix=None): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + self._pluggable_adapter = pluggable_adapter + self._prefix = "SPLITIO.rbsegment.${segmen_name}" + self._rb_segments_till_prefix = "SPLITIO.rbsegments.till" + if prefix is not None: + self._prefix = prefix + "." + self._prefix + self._rb_segments_till_prefix = prefix + "." + self._rb_segments_till_prefix + + def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + pass + + def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + pass + + def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + pass + + def get_segment_names(self): + """ + Retrieve a list of all excluded segments names. + + :return: List of segment names. + :rtype: list(str) + """ + pass + + def update(self, to_add, to_delete, new_change_number): + """ + Update rule based segment.. + + :param to_add: List of rule based segment. to add + :type to_add: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param to_delete: List of rule based segment. to delete + :type to_delete: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param new_change_number: New change number. + :type new_change_number: int + """ + raise NotImplementedError('Only redis-consumer mode is supported.') + + def get_large_segment_names(self): + """ + Retrieve a list of all excluded large segments names. + + :return: List of segment names. + :rtype: list(str) + """ + pass + +class PluggableRuleBasedSegmentsStorage(PluggableRuleBasedSegmentsStorageBase): + """RedPluggable storage for rule based segments.""" + + def __init__(self, pluggable_adapter, prefix=None): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + PluggableRuleBasedSegmentsStorageBase.__init__(self, pluggable_adapter, prefix) + + def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + try: + rb_segment = self._pluggable_adapter.get(self._prefix.format(segment_name=segment_name)) + if not rb_segment: + return None + + return rule_based_segments.from_raw(rb_segment) + + except Exception: + _LOGGER.error('Error getting rule based segment from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + try: + return self._pluggable_adapter.get(self._rb_segments_till_prefix) + + except Exception: + _LOGGER.error('Error getting change number in rule based segment storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + return set(segment_names).issubset(self.get_segment_names()) + + def get_segment_names(self): + """ + Retrieve a list of all rule based segments names. + + :return: List of segment names. + :rtype: list(str) + """ + try: + keys = [] + for key in self._pluggable_adapter.get_keys_by_prefix(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]): + if key[-self._TILL_LENGTH:] != 'till': + keys.append(key[len(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]):]) + return keys + + except Exception: + _LOGGER.error('Error getting rule based segments names from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + +class PluggableRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage): + """RedPluggable storage for rule based segments.""" + + def __init__(self, pluggable_adapter, prefix=None): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + PluggableRuleBasedSegmentsStorageBase.__init__(self, pluggable_adapter, prefix) + + async def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + try: + rb_segment = await self._pluggable_adapter.get(self._prefix.format(segment_name=segment_name)) + if not rb_segment: + return None + + return rule_based_segments.from_raw(rb_segment) + + except Exception: + _LOGGER.error('Error getting rule based segment from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + async def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + try: + return await self._pluggable_adapter.get(self._rb_segments_till_prefix) + + except Exception: + _LOGGER.error('Error getting change number in rule based segment storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + async def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + return await set(segment_names).issubset(self.get_segment_names()) + + async def get_segment_names(self): + """ + Retrieve a list of all rule based segments names. + + :return: List of segment names. + :rtype: list(str) + """ + try: + keys = [] + for key in await self._pluggable_adapter.get_keys_by_prefix(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]): + if key[-self._TILL_LENGTH:] != 'till': + keys.append(key[len(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]):]) + return keys + + except Exception: + _LOGGER.error('Error getting rule based segments names from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + class PluggableSplitStorageBase(SplitStorage): """InMemory implementation of a feature flag storage.""" @@ -90,7 +328,7 @@ def update(self, to_add, to_delete, new_change_number): :param new_change_number: New change number. :type new_change_number: int """ -# pass + pass # try: # split = self.get(feature_flag_name) # if not split: diff --git a/splitio/storage/redis.py b/splitio/storage/redis.py index 982e0213..60b532e9 100644 --- a/splitio/storage/redis.py +++ b/splitio/storage/redis.py @@ -4,10 +4,10 @@ import threading from splitio.models.impressions import Impression -from splitio.models import splits, segments +from splitio.models import splits, segments, rule_based_segments from splitio.models.telemetry import TelemetryConfig, TelemetryConfigAsync from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, \ - ImpressionPipelinedStorage, TelemetryStorage, FlagSetsFilter + ImpressionPipelinedStorage, TelemetryStorage, FlagSetsFilter, RuleBasedSegmentsStorage from splitio.storage.adapters.redis import RedisAdapterException from splitio.storage.adapters.cache_trait import decorate as add_cache, DEFAULT_MAX_AGE from splitio.storage.adapters.cache_trait import LocalMemoryCache, LocalMemoryCacheAsync @@ -16,8 +16,238 @@ _LOGGER = logging.getLogger(__name__) MAX_TAGS = 10 +class RedisRuleBasedSegmentsStorage(RuleBasedSegmentsStorage): + """Redis-based storage for rule based segments.""" + + _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.${segmen_name}' + _RB_SEGMENT_TILL_KEY = 'SPLITIO.rbsegments.till' + + def __init__(self, redis_client): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + self._redis = redis_client + self._pipe = self._redis.pipeline + + def _get_key(self, segment_name): + """ + Use the provided feature_flag_name to build the appropriate redis key. + + :param feature_flag_name: Name of the feature flag to interact with in redis. + :type feature_flag_name: str + + :return: Redis key. + :rtype: str. + """ + return self._RB_SEGMENT_KEY.format(segment_name=segment_name) + + def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + try: + raw = self._redis.get(self._get_key(segment_name)) + _LOGGER.debug("Fetchting rule based segment [%s] from redis" % segment_name) + _LOGGER.debug(raw) + return rule_based_segments.from_raw(json.loads(raw)) if raw is not None else None + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segment from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + def update(self, to_add, to_delete, new_change_number): + """ + Update rule based segment.. + + :param to_add: List of rule based segment. to add + :type to_add: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param to_delete: List of rule based segment. to delete + :type to_delete: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param new_change_number: New change number. + :type new_change_number: int + """ + raise NotImplementedError('Only redis-consumer mode is supported.') + + def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + try: + stored_value = self._redis.get(self._RB_SEGMENT_TILL_KEY) + _LOGGER.debug("Fetching rule based segment Change Number from redis: %s" % stored_value) + return json.loads(stored_value) if stored_value is not None else None + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segment change number from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + return set(segment_names).issubset(self.get_segment_names()) + + def get_segment_names(self): + """ + Retrieve a list of all rule based segments names. + + :return: List of segment names. + :rtype: list(str) + """ + try: + keys = self._redis.keys(self._get_key('*')) + _LOGGER.debug("Fetchting rule based segments names from redis: %s" % keys) + return [key.replace(self._get_key(''), '') for key in keys] + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segments names from storage') + _LOGGER.debug('Error: ', exc_info=True) + return [] + + def get_large_segment_names(self): + """ + Retrieve a list of all excluded large segments names. + + :return: List of segment names. + :rtype: list(str) + """ + pass + +class RedisRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage): + """Redis-based storage for rule based segments.""" + + _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.${segmen_name}' + _RB_SEGMENT_TILL_KEY = 'SPLITIO.rbsegments.till' + + def __init__(self, redis_client): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + self._redis = redis_client + self._pipe = self._redis.pipeline + + def _get_key(self, segment_name): + """ + Use the provided feature_flag_name to build the appropriate redis key. + + :param feature_flag_name: Name of the feature flag to interact with in redis. + :type feature_flag_name: str + + :return: Redis key. + :rtype: str. + """ + return self._RB_SEGMENT_KEY.format(segment_name=segment_name) + + async def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + try: + raw = await self._redis.get(self._get_key(segment_name)) + _LOGGER.debug("Fetchting rule based segment [%s] from redis" % segment_name) + _LOGGER.debug(raw) + return rule_based_segments.from_raw(json.loads(raw)) if raw is not None else None + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segment from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + async def update(self, to_add, to_delete, new_change_number): + """ + Update rule based segment.. + + :param to_add: List of rule based segment. to add + :type to_add: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param to_delete: List of rule based segment. to delete + :type to_delete: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param new_change_number: New change number. + :type new_change_number: int + """ + raise NotImplementedError('Only redis-consumer mode is supported.') + + async def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + try: + stored_value = await self._redis.get(self._RB_SEGMENT_TILL_KEY) + _LOGGER.debug("Fetching rule based segment Change Number from redis: %s" % stored_value) + return json.loads(stored_value) if stored_value is not None else None + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segment change number from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + async def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + return set(segment_names).issubset(await self.get_segment_names()) + + async def get_segment_names(self): + """ + Retrieve a list of all rule based segments names. + + :return: List of segment names. + :rtype: list(str) + """ + try: + keys = await self._redis.keys(self._get_key('*')) + _LOGGER.debug("Fetchting rule based segments names from redis: %s" % keys) + return [key.replace(self._get_key(''), '') for key in keys] + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segments names from storage') + _LOGGER.debug('Error: ', exc_info=True) + return [] + + async def get_large_segment_names(self): + """ + Retrieve a list of all excluded large segments names. + + :return: List of segment names. + :rtype: list(str) + """ + pass + class RedisSplitStorageBase(SplitStorage): - """Redis-based storage base for s.""" + """Redis-based storage base for feature flags.""" _FEATURE_FLAG_KEY = 'SPLITIO.split.{feature_flag_name}' _FEATURE_FLAG_TILL_KEY = 'SPLITIO.splits.till' diff --git a/splitio/sync/split.py b/splitio/sync/split.py index d0e4690c..4d4d5a5a 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -456,10 +456,10 @@ def _make_whitelist_condition(whitelist, treatment): 'combiner': 'AND' } } - - def _sanitize_feature_flag(self, parsed): + + def _sanitize_json_elements(self, parsed): """ - implement Sanitization if neded. + Sanitize all json elements. :param parsed: feature flags, till and since elements dict :type parsed: Dict @@ -467,14 +467,14 @@ def _sanitize_feature_flag(self, parsed): :return: sanitized structure dict :rtype: Dict """ - parsed = self._sanitize_json_elements(parsed) - parsed['splits'] = self._sanitize_feature_flag_elements(parsed['splits']) - + parsed = self._satitize_json_section(parsed, 'ff') + parsed = self._satitize_json_section(parsed, 'rbs') + return parsed - def _sanitize_json_elements(self, parsed): + def _satitize_json_section(self, parsed, section_name): """ - Sanitize all json elements. + Sanitize specific json section. :param parsed: feature flags, till and since elements dict :type parsed: Dict @@ -482,15 +482,17 @@ def _sanitize_json_elements(self, parsed): :return: sanitized structure dict :rtype: Dict """ - if 'splits' not in parsed: - parsed['splits'] = [] - if 'till' not in parsed or parsed['till'] is None or parsed['till'] < -1: - parsed['till'] = -1 - if 'since' not in parsed or parsed['since'] is None or parsed['since'] < -1 or parsed['since'] > parsed['till']: - parsed['since'] = parsed['till'] + if section_name not in parsed: + parsed['ff'] = {"t": -1, "s": -1, "d": []} + if 'd' not in parsed[section_name]: + parsed[section_name]['d'] = [] + if 't' not in parsed[section_name] or parsed[section_name]['t'] is None or parsed[section_name]['t'] < -1: + parsed[section_name]['t'] = -1 + if 's' not in parsed[section_name] or parsed[section_name]['s'] is None or parsed[section_name]['s'] < -1 or parsed[section_name]['s'] > parsed[section_name]['t']: + parsed[section_name]['s'] = parsed[section_name]['t'] return parsed - + def _sanitize_feature_flag_elements(self, parsed_feature_flags): """ Sanitize all feature flags elements. @@ -523,6 +525,29 @@ def _sanitize_feature_flag_elements(self, parsed_feature_flags): sanitized_feature_flags.append(feature_flag) return sanitized_feature_flags + def _sanitize_rb_segment_elements(self, parsed_rb_segments): + """ + Sanitize all rule based segments elements. + + :param parsed_rb_segments: rule based segments array + :type parsed_rb_segments: [Dict] + + :return: sanitized structure dict + :rtype: [Dict] + """ + sanitized_rb_segments = [] + for rb_segment in parsed_rb_segments: + if 'name' not in rb_segment or rb_segment['name'].strip() == '': + _LOGGER.warning("A rule based segment in json file does not have (Name) or property is empty, skipping.") + continue + for element in [('trafficTypeName', 'user', None, None, None, None), + ('status', 'ACTIVE', None, None, ['ACTIVE', 'ARCHIVED'], None), + ('changeNumber', 0, 0, None, None, None)]: + rb_segment = util._sanitize_object_element(rb_segment, 'rule based segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=element[4], not_in_list=element[5]) + rb_segment = self._sanitize_condition(rb_segment) + sanitized_rb_segments.append(rb_segment) + return sanitized_rb_segments + def _sanitize_condition(self, feature_flag): """ Sanitize feature flag and ensure a condition type ROLLOUT and matcher exist with ALL_KEYS elements. @@ -601,7 +626,7 @@ def _convert_yaml_to_feature_flag(cls, parsed): class LocalSplitSynchronizer(LocalSplitSynchronizerBase): """Localhost mode feature_flag synchronizer.""" - def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode.LEGACY): + def __init__(self, filename, feature_flag_storage, rule_based_segment_storage, localhost_mode=LocalhostMode.LEGACY): """ Class constructor. @@ -614,6 +639,7 @@ def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode. """ self._filename = filename self._feature_flag_storage = feature_flag_storage + self._rule_based_segment_storage = rule_based_segment_storage self._localhost_mode = localhost_mode self._current_json_sha = "-1" @@ -706,18 +732,23 @@ def _synchronize_json(self): :rtype: [str] """ try: - fetched, till = self._read_feature_flags_from_json_file(self._filename) + parsed = self._read_feature_flags_from_json_file(self._filename) segment_list = set() - fecthed_sha = util._get_sha(json.dumps(fetched)) + fecthed_sha = util._get_sha(json.dumps(parsed)) if fecthed_sha == self._current_json_sha: return [] self._current_json_sha = fecthed_sha - if self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL: + if self._feature_flag_storage.get_change_number() > parsed['ff']['t'] and parsed['ff']['t'] != self._DEFAULT_FEATURE_FLAG_TILL: return [] - fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in fetched] - segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, till) + fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in parsed['ff']['d']] + segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, parsed['ff']['t']) + + if self._rule_based_segment_storage.get_change_number() <= parsed['rbs']['t'] or parsed['rbs']['t'] == self._DEFAULT_FEATURE_FLAG_TILL: + fetched_rb_segments = [rule_based_segments.from_raw(rb_segment) for rb_segment in parsed['rbs']['d']] + segment_list.update(update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rb_segments, parsed['rbs']['t'])) + return segment_list except Exception as exc: @@ -737,8 +768,11 @@ def _read_feature_flags_from_json_file(self, filename): try: with open(filename, 'r') as flo: parsed = json.load(flo) - santitized = self._sanitize_feature_flag(parsed) - return santitized['splits'], santitized['till'] + santitized = self._sanitize_json_elements(parsed) + santitized['ff'] = self._sanitize_feature_flag_elements(santitized['ff']) + santitized['rbs'] = self._sanitize_rb_segment_elements(santitized['rbs']) + return santitized + except Exception as exc: _LOGGER.debug('Exception: ', exc_info=True) raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc @@ -747,7 +781,7 @@ def _read_feature_flags_from_json_file(self, filename): class LocalSplitSynchronizerAsync(LocalSplitSynchronizerBase): """Localhost mode async feature_flag synchronizer.""" - def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode.LEGACY): + def __init__(self, filename, feature_flag_storage, rule_based_segment_storage, localhost_mode=LocalhostMode.LEGACY): """ Class constructor. @@ -760,6 +794,7 @@ def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode. """ self._filename = filename self._feature_flag_storage = feature_flag_storage + self._rule_based_segment_storage = rule_based_segment_storage self._localhost_mode = localhost_mode self._current_json_sha = "-1" @@ -853,18 +888,23 @@ async def _synchronize_json(self): :rtype: [str] """ try: - fetched, till = await self._read_feature_flags_from_json_file(self._filename) + parsed = await self._read_feature_flags_from_json_file(self._filename) segment_list = set() - fecthed_sha = util._get_sha(json.dumps(fetched)) + fecthed_sha = util._get_sha(json.dumps(parsed)) if fecthed_sha == self._current_json_sha: return [] self._current_json_sha = fecthed_sha - if await self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL: + if await self._feature_flag_storage.get_change_number() > parsed['ff']['t'] and parsed['ff']['t'] != self._DEFAULT_FEATURE_FLAG_TILL: return [] - fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in fetched] - segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, till) + fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in parsed['ff']['d']] + segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, parsed['ff']['t']) + + if await self._rule_based_segment_storage.get_change_number() <= parsed['rbs']['t'] or parsed['rbs']['t'] == self._DEFAULT_FEATURE_FLAG_TILL: + fetched_rb_segments = [rule_based_segments.from_raw(rb_segment) for rb_segment in parsed['rbs']['d']] + segment_list.update(await update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rb_segments, parsed['rbs']['t'])) + return segment_list except Exception as exc: @@ -884,8 +924,10 @@ async def _read_feature_flags_from_json_file(self, filename): try: async with aiofiles.open(filename, 'r') as flo: parsed = json.loads(await flo.read()) - santitized = self._sanitize_feature_flag(parsed) - return santitized['splits'], santitized['till'] + santitized = self._sanitize_json_elements(parsed) + santitized['ff'] = self._sanitize_feature_flag_elements(santitized['ff']) + santitized['rbs'] = self._sanitize_rb_segment_elements(santitized['rbs']) + return santitized except Exception as exc: _LOGGER.debug('Exception: ', exc_info=True) raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc From 4d8327c84cdb0036899fa7f1639a7980b79ae7de Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Thu, 13 Mar 2025 15:05:04 -0300 Subject: [PATCH 3/6] Updated redis, pluggable and localjson storages --- splitio/models/rule_based_segments.py | 30 ++- splitio/storage/pluggable.py | 20 +- splitio/storage/redis.py | 4 +- splitio/sync/split.py | 20 +- tests/integration/__init__.py | 4 + tests/storage/test_pluggable.py | 128 +++++++++++- tests/storage/test_redis.py | 163 ++++++++++++++- tests/sync/test_splits_synchronizer.py | 274 +++++++++++++------------ tests/sync/test_synchronizer.py | 2 - 9 files changed, 482 insertions(+), 163 deletions(-) diff --git a/splitio/models/rule_based_segments.py b/splitio/models/rule_based_segments.py index 4ff548b2..66ec7ddf 100644 --- a/splitio/models/rule_based_segments.py +++ b/splitio/models/rule_based_segments.py @@ -11,14 +11,14 @@ class RuleBasedSegment(object): """RuleBasedSegment object class.""" - def __init__(self, name, traffic_yype_Name, change_number, status, conditions, excluded): + def __init__(self, name, traffic_type_name, change_number, status, conditions, excluded): """ Class constructor. :param name: Segment name. :type name: str - :param traffic_yype_Name: traffic type name. - :type traffic_yype_Name: str + :param traffic_type_name: traffic type name. + :type traffic_type_name: str :param change_number: change number. :type change_number: str :param status: status. @@ -29,7 +29,7 @@ def __init__(self, name, traffic_yype_Name, change_number, status, conditions, e :type excluded: Excluded """ self._name = name - self._traffic_yype_Name = traffic_yype_Name + self._traffic_type_name = traffic_type_name self._change_number = change_number self._status = status self._conditions = conditions @@ -41,9 +41,9 @@ def name(self): return self._name @property - def traffic_yype_Name(self): + def traffic_type_name(self): """Return traffic type name.""" - return self._traffic_yype_Name + return self._traffic_type_name @property def change_number(self): @@ -65,6 +65,17 @@ def excluded(self): """Return excluded.""" return self._excluded + def to_json(self): + """Return a JSON representation of this rule based segment.""" + return { + 'changeNumber': self.change_number, + 'trafficTypeName': self.traffic_type_name, + 'name': self.name, + 'status': self.status, + 'conditions': [c.to_json() for c in self.conditions], + 'excluded': self.excluded.to_json() + } + def from_raw(raw_rule_based_segment): """ Parse a Rule based segment from a JSON portion of splitChanges. @@ -111,3 +122,10 @@ def get_excluded_keys(self): def get_excluded_segments(self): """Return excluded segments""" return self._segments + + def to_json(self): + """Return a JSON representation of this object.""" + return { + 'keys': self._keys, + 'segments': self._segments + } diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 1cb7e054..66fad1e5 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -17,7 +17,6 @@ class PluggableRuleBasedSegmentsStorageBase(RuleBasedSegmentsStorage): """RedPluggable storage for rule based segments.""" - _RB_SEGMENT_NAME_LENGTH = 23 _TILL_LENGTH = 4 def __init__(self, pluggable_adapter, prefix=None): @@ -28,9 +27,11 @@ def __init__(self, pluggable_adapter, prefix=None): :type redis_client: splitio.storage.adapters.redis.RedisAdapter """ self._pluggable_adapter = pluggable_adapter - self._prefix = "SPLITIO.rbsegment.${segmen_name}" + self._prefix = "SPLITIO.rbsegment.{segment_name}" self._rb_segments_till_prefix = "SPLITIO.rbsegments.till" + self._rb_segment_name_length = 18 if prefix is not None: + self._rb_segment_name_length += len(prefix) + 1 self._prefix = prefix + "." + self._prefix self._rb_segments_till_prefix = prefix + "." + self._rb_segments_till_prefix @@ -163,10 +164,13 @@ def get_segment_names(self): :rtype: list(str) """ try: + _LOGGER.error(self._rb_segment_name_length) + _LOGGER.error(self._prefix) + _LOGGER.error(self._prefix[:self._rb_segment_name_length]) keys = [] - for key in self._pluggable_adapter.get_keys_by_prefix(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]): + for key in self._pluggable_adapter.get_keys_by_prefix(self._prefix[:self._rb_segment_name_length]): if key[-self._TILL_LENGTH:] != 'till': - keys.append(key[len(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]):]) + keys.append(key[len(self._prefix[:self._rb_segment_name_length]):]) return keys except Exception: @@ -174,7 +178,7 @@ def get_segment_names(self): _LOGGER.debug('Error: ', exc_info=True) return None -class PluggableRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage): +class PluggableRuleBasedSegmentsStorageAsync(PluggableRuleBasedSegmentsStorageBase): """RedPluggable storage for rule based segments.""" def __init__(self, pluggable_adapter, prefix=None): @@ -231,7 +235,7 @@ async def contains(self, segment_names): :return: True if segment names exists. False otherwise. :rtype: bool """ - return await set(segment_names).issubset(self.get_segment_names()) + return set(segment_names).issubset(await self.get_segment_names()) async def get_segment_names(self): """ @@ -242,9 +246,9 @@ async def get_segment_names(self): """ try: keys = [] - for key in await self._pluggable_adapter.get_keys_by_prefix(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]): + for key in await self._pluggable_adapter.get_keys_by_prefix(self._prefix[:self._rb_segment_name_length]): if key[-self._TILL_LENGTH:] != 'till': - keys.append(key[len(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]):]) + keys.append(key[len(self._prefix[:self._rb_segment_name_length]):]) return keys except Exception: diff --git a/splitio/storage/redis.py b/splitio/storage/redis.py index 60b532e9..e5398cf7 100644 --- a/splitio/storage/redis.py +++ b/splitio/storage/redis.py @@ -19,7 +19,7 @@ class RedisRuleBasedSegmentsStorage(RuleBasedSegmentsStorage): """Redis-based storage for rule based segments.""" - _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.${segmen_name}' + _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.{segment_name}' _RB_SEGMENT_TILL_KEY = 'SPLITIO.rbsegments.till' def __init__(self, redis_client): @@ -134,7 +134,7 @@ def get_large_segment_names(self): class RedisRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage): """Redis-based storage for rule based segments.""" - _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.${segmen_name}' + _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.{segment_name}' _RB_SEGMENT_TILL_KEY = 'SPLITIO.rbsegments.till' def __init__(self, redis_client): diff --git a/splitio/sync/split.py b/splitio/sync/split.py index 4d4d5a5a..58ea900a 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -545,6 +545,7 @@ def _sanitize_rb_segment_elements(self, parsed_rb_segments): ('changeNumber', 0, 0, None, None, None)]: rb_segment = util._sanitize_object_element(rb_segment, 'rule based segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=element[4], not_in_list=element[5]) rb_segment = self._sanitize_condition(rb_segment) + rb_segment = self._remove_partition(rb_segment) sanitized_rb_segments.append(rb_segment) return sanitized_rb_segments @@ -599,6 +600,15 @@ def _sanitize_condition(self, feature_flag): }) return feature_flag + + def _remove_partition(self, rb_segment): + sanitized = [] + for condition in rb_segment['conditions']: + if 'partition' in condition: + del condition['partition'] + sanitized.append(condition) + rb_segment['conditions'] = sanitized + return rb_segment @classmethod def _convert_yaml_to_feature_flag(cls, parsed): @@ -769,8 +779,8 @@ def _read_feature_flags_from_json_file(self, filename): with open(filename, 'r') as flo: parsed = json.load(flo) santitized = self._sanitize_json_elements(parsed) - santitized['ff'] = self._sanitize_feature_flag_elements(santitized['ff']) - santitized['rbs'] = self._sanitize_rb_segment_elements(santitized['rbs']) + santitized['ff']['d'] = self._sanitize_feature_flag_elements(santitized['ff']['d']) + santitized['rbs']['d'] = self._sanitize_rb_segment_elements(santitized['rbs']['d']) return santitized except Exception as exc: @@ -903,7 +913,7 @@ async def _synchronize_json(self): if await self._rule_based_segment_storage.get_change_number() <= parsed['rbs']['t'] or parsed['rbs']['t'] == self._DEFAULT_FEATURE_FLAG_TILL: fetched_rb_segments = [rule_based_segments.from_raw(rb_segment) for rb_segment in parsed['rbs']['d']] - segment_list.update(await update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rb_segments, parsed['rbs']['t'])) + segment_list.update(await update_rule_based_segment_storage_async(self._rule_based_segment_storage, fetched_rb_segments, parsed['rbs']['t'])) return segment_list @@ -925,8 +935,8 @@ async def _read_feature_flags_from_json_file(self, filename): async with aiofiles.open(filename, 'r') as flo: parsed = json.loads(await flo.read()) santitized = self._sanitize_json_elements(parsed) - santitized['ff'] = self._sanitize_feature_flag_elements(santitized['ff']) - santitized['rbs'] = self._sanitize_rb_segment_elements(santitized['rbs']) + santitized['ff']['d'] = self._sanitize_feature_flag_elements(santitized['ff']['d']) + santitized['rbs']['d'] = self._sanitize_rb_segment_elements(santitized['rbs']['d']) return santitized except Exception as exc: _LOGGER.debug('Exception: ', exc_info=True) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index ee2475df..ab6e3293 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -47,3 +47,7 @@ "splitChange6_2": split62, "splitChange6_3": split63, } + +rbsegments_json = { + "segment1": {"changeNumber": 12, "name": "some_segment", "status": "ACTIVE","trafficTypeName": "user","excluded":{"keys":[],"segments":[]},"conditions": []} +} \ No newline at end of file diff --git a/tests/storage/test_pluggable.py b/tests/storage/test_pluggable.py index 439049e5..953a4510 100644 --- a/tests/storage/test_pluggable.py +++ b/tests/storage/test_pluggable.py @@ -1,20 +1,21 @@ """Pluggable storage test module.""" import json import threading +import copy import pytest from splitio.optional.loaders import asyncio from splitio.models.splits import Split -from splitio.models import splits, segments +from splitio.models import splits, segments, rule_based_segments from splitio.models.segments import Segment from splitio.models.impressions import Impression from splitio.models.events import Event, EventWrapper from splitio.storage.pluggable import PluggableSplitStorage, PluggableSegmentStorage, PluggableImpressionsStorage, PluggableEventsStorage, \ PluggableTelemetryStorage, PluggableEventsStorageAsync, PluggableSegmentStorageAsync, PluggableImpressionsStorageAsync,\ - PluggableSplitStorageAsync, PluggableTelemetryStorageAsync + PluggableSplitStorageAsync, PluggableTelemetryStorageAsync, PluggableRuleBasedSegmentsStorage, PluggableRuleBasedSegmentsStorageAsync from splitio.client.util import get_metadata, SdkMetadata from splitio.models.telemetry import MAX_TAGS, MethodExceptionsAndLatencies, OperationMode -from tests.integration import splits_json +from tests.integration import splits_json, rbsegments_json class StorageMockAdapter(object): def __init__(self): @@ -1372,3 +1373,124 @@ async def test_push_config_stats(self): await pluggable_telemetry_storage.record_active_and_redundant_factories(2, 1) await pluggable_telemetry_storage.push_config_stats() assert(self.mock_adapter._keys[pluggable_telemetry_storage._telemetry_config_key + "::" + pluggable_telemetry_storage._sdk_metadata] == '{"aF": 2, "rF": 1, "sT": "memory", "oM": 0, "t": []}') + +class PluggableRuleBasedSegmentStorageTests(object): + """In memory rule based segment storage test cases.""" + + def setup_method(self): + """Prepare storages with test data.""" + self.mock_adapter = StorageMockAdapter() + + def test_get(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorage(self.mock_adapter, prefix=sprefix) + + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs_name = rbsegments_json['segment1']['name'] + + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs_name), rbs1.to_json()) + assert(pluggable_rbs_storage.get(rbs_name).to_json() == rule_based_segments.from_raw(rbsegments_json['segment1']).to_json()) + assert(pluggable_rbs_storage.get('not_existing') == None) + + def test_get_change_number(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorage(self.mock_adapter, prefix=sprefix) + if sprefix == 'myprefix': + prefix = 'myprefix.' + else: + prefix = '' + self.mock_adapter.set(prefix + "SPLITIO.rbsegments.till", 1234) + assert(pluggable_rbs_storage.get_change_number() == 1234) + + def test_get_segment_names(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorage(self.mock_adapter, prefix=sprefix) + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs2_temp = copy.deepcopy(rbsegments_json['segment1']) + rbs2_temp['name'] = 'another_segment' + rbs2 = rule_based_segments.from_raw(rbs2_temp) + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs1.name), rbs1.to_json()) + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs2.name), rbs2.to_json()) + assert(pluggable_rbs_storage.get_segment_names() == [rbs1.name, rbs2.name]) + + def test_contains(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorage(self.mock_adapter, prefix=sprefix) + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs2_temp = copy.deepcopy(rbsegments_json['segment1']) + rbs2_temp['name'] = 'another_segment' + rbs2 = rule_based_segments.from_raw(rbs2_temp) + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs1.name), rbs1.to_json()) + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs2.name), rbs2.to_json()) + + assert(pluggable_rbs_storage.contains([rbs1.name, rbs2.name])) + assert(pluggable_rbs_storage.contains([rbs2.name])) + assert(not pluggable_rbs_storage.contains(['none-exists', rbs2.name])) + assert(not pluggable_rbs_storage.contains(['none-exists', 'none-exists2'])) + +class PluggableRuleBasedSegmentStorageAsyncTests(object): + """In memory rule based segment storage test cases.""" + + def setup_method(self): + """Prepare storages with test data.""" + self.mock_adapter = StorageMockAdapterAsync() + + @pytest.mark.asyncio + async def test_get(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorageAsync(self.mock_adapter, prefix=sprefix) + + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs_name = rbsegments_json['segment1']['name'] + + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs_name), rbs1.to_json()) + rbs = await pluggable_rbs_storage.get(rbs_name) + assert(rbs.to_json() == rule_based_segments.from_raw(rbsegments_json['segment1']).to_json()) + assert(await pluggable_rbs_storage.get('not_existing') == None) + + @pytest.mark.asyncio + async def test_get_change_number(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorageAsync(self.mock_adapter, prefix=sprefix) + if sprefix == 'myprefix': + prefix = 'myprefix.' + else: + prefix = '' + await self.mock_adapter.set(prefix + "SPLITIO.rbsegments.till", 1234) + assert(await pluggable_rbs_storage.get_change_number() == 1234) + + @pytest.mark.asyncio + async def test_get_segment_names(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorageAsync(self.mock_adapter, prefix=sprefix) + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs2_temp = copy.deepcopy(rbsegments_json['segment1']) + rbs2_temp['name'] = 'another_segment' + rbs2 = rule_based_segments.from_raw(rbs2_temp) + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs1.name), rbs1.to_json()) + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs2.name), rbs2.to_json()) + assert(await pluggable_rbs_storage.get_segment_names() == [rbs1.name, rbs2.name]) + + @pytest.mark.asyncio + async def test_contains(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorageAsync(self.mock_adapter, prefix=sprefix) + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs2_temp = copy.deepcopy(rbsegments_json['segment1']) + rbs2_temp['name'] = 'another_segment' + rbs2 = rule_based_segments.from_raw(rbs2_temp) + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs1.name), rbs1.to_json()) + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs2.name), rbs2.to_json()) + + assert(await pluggable_rbs_storage.contains([rbs1.name, rbs2.name])) + assert(await pluggable_rbs_storage.contains([rbs2.name])) + assert(not await pluggable_rbs_storage.contains(['none-exists', rbs2.name])) + assert(not await pluggable_rbs_storage.contains(['none-exists', 'none-exists2'])) diff --git a/tests/storage/test_redis.py b/tests/storage/test_redis.py index cce9a43d..04ddfc60 100644 --- a/tests/storage/test_redis.py +++ b/tests/storage/test_redis.py @@ -12,7 +12,8 @@ from splitio.optional.loaders import asyncio from splitio.storage import FlagSetsFilter from splitio.storage.redis import RedisEventsStorage, RedisEventsStorageAsync, RedisImpressionsStorage, RedisImpressionsStorageAsync, \ - RedisSegmentStorage, RedisSegmentStorageAsync, RedisSplitStorage, RedisSplitStorageAsync, RedisTelemetryStorage, RedisTelemetryStorageAsync + RedisSegmentStorage, RedisSegmentStorageAsync, RedisSplitStorage, RedisSplitStorageAsync, RedisTelemetryStorage, RedisTelemetryStorageAsync, \ + RedisRuleBasedSegmentsStorage, RedisRuleBasedSegmentsStorageAsync from splitio.storage.adapters.redis import RedisAdapter, RedisAdapterException, build from redis.asyncio.client import Redis as aioredis from splitio.storage.adapters import redis @@ -1230,3 +1231,163 @@ async def expire(*args): await redis_telemetry.expire_keys('key', 12, 2, 2) assert(self.called) + +class RedisRuleBasedSegmentStorageTests(object): + """Redis rule based segment storage test cases.""" + + def test_get_segment(self, mocker): + """Test retrieving a rule based segment works.""" + adapter = mocker.Mock(spec=RedisAdapter) + adapter.get.return_value = '{"name": "some_segment"}' + from_raw = mocker.Mock() + mocker.patch('splitio.storage.redis.rule_based_segments.from_raw', new=from_raw) + + storage = RedisRuleBasedSegmentsStorage(adapter) + storage.get('some_segment') + + assert adapter.get.mock_calls == [mocker.call('SPLITIO.rbsegment.some_segment')] + assert from_raw.mock_calls == [mocker.call({"name": "some_segment"})] + + # Test that a missing split returns None and doesn't call from_raw + adapter.reset_mock() + from_raw.reset_mock() + adapter.get.return_value = None + result = storage.get('some_segment') + assert result is None + assert adapter.get.mock_calls == [mocker.call('SPLITIO.rbsegment.some_segment')] + assert not from_raw.mock_calls + + def test_get_changenumber(self, mocker): + """Test fetching changenumber.""" + adapter = mocker.Mock(spec=RedisAdapter) + storage = RedisRuleBasedSegmentsStorage(adapter) + adapter.get.return_value = '-1' + assert storage.get_change_number() == -1 + assert adapter.get.mock_calls == [mocker.call('SPLITIO.rbsegments.till')] + + def test_get_segment_names(self, mocker): + """Test getching rule based segment names.""" + adapter = mocker.Mock(spec=RedisAdapter) + storage = RedisRuleBasedSegmentsStorage(adapter) + adapter.keys.return_value = [ + 'SPLITIO.rbsegment.segment1', + 'SPLITIO.rbsegment.segment2', + 'SPLITIO.rbsegment.segment3' + ] + assert storage.get_segment_names() == ['segment1', 'segment2', 'segment3'] + + def test_contains(self, mocker): + """Test storage containing rule based segment names.""" + adapter = mocker.Mock(spec=RedisAdapter) + storage = RedisRuleBasedSegmentsStorage(adapter) + adapter.keys.return_value = [ + 'SPLITIO.rbsegment.segment1', + 'SPLITIO.rbsegment.segment2', + 'SPLITIO.rbsegment.segment3' + ] + assert storage.contains(['segment1', 'segment3']) + assert not storage.contains(['segment1', 'segment4']) + assert storage.contains(['segment1']) + assert not storage.contains(['segment4', 'segment5']) + +class RedisRuleBasedSegmentStorageAsyncTests(object): + """Redis rule based segment storage test cases.""" + + @pytest.mark.asyncio + async def test_get_segment(self, mocker): + """Test retrieving a rule based segment works.""" + redis_mock = await aioredis.from_url("redis://localhost") + adapter = redis.RedisAdapterAsync(redis_mock, 'some_prefix') + + self.redis_ret = None + self.name = None + async def get(sel, name): + self.name = name + self.redis_ret = '{"changeNumber": "12", "name": "some_segment", "status": "ACTIVE","trafficTypeName": "user","excluded":{"keys":[],"segments":[]},"conditions": []}' + return self.redis_ret + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.get', new=get) + + storage = RedisRuleBasedSegmentsStorageAsync(adapter) + await storage.get('some_segment') + + assert self.name == 'SPLITIO.rbsegment.some_segment' + assert self.redis_ret == '{"changeNumber": "12", "name": "some_segment", "status": "ACTIVE","trafficTypeName": "user","excluded":{"keys":[],"segments":[]},"conditions": []}' + + # Test that a missing split returns None and doesn't call from_raw + + self.name = None + async def get2(sel, name): + self.name = name + return None + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.get', new=get2) + + result = await storage.get('some_segment') + assert result is None + assert self.name == 'SPLITIO.rbsegment.some_segment' + + # Test that a missing split returns None and doesn't call from_raw + result = await storage.get('some_segment2') + assert result is None + + @pytest.mark.asyncio + async def test_get_changenumber(self, mocker): + """Test fetching changenumber.""" + redis_mock = await aioredis.from_url("redis://localhost") + adapter = redis.RedisAdapterAsync(redis_mock, 'some_prefix') + storage = RedisRuleBasedSegmentsStorageAsync(adapter) + + self.redis_ret = None + self.name = None + async def get(sel, name): + self.name = name + self.redis_ret = '-1' + return self.redis_ret + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.get', new=get) + + assert await storage.get_change_number() == -1 + assert self.name == 'SPLITIO.rbsegments.till' + + @pytest.mark.asyncio + async def test_get_segment_names(self, mocker): + """Test getching rule based segment names.""" + redis_mock = await aioredis.from_url("redis://localhost") + adapter = redis.RedisAdapterAsync(redis_mock, 'some_prefix') + storage = RedisRuleBasedSegmentsStorageAsync(adapter) + + self.key = None + self.keys_ret = None + async def keys(sel, key): + self.key = key + self.keys_ret = [ + 'SPLITIO.rbsegment.segment1', + 'SPLITIO.rbsegment.segment2', + 'SPLITIO.rbsegment.segment3' + ] + return self.keys_ret + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.keys', new=keys) + + assert await storage.get_segment_names() == ['segment1', 'segment2', 'segment3'] + + @pytest.mark.asyncio + async def test_contains(self, mocker): + """Test storage containing rule based segment names.""" + redis_mock = await aioredis.from_url("redis://localhost") + adapter = redis.RedisAdapterAsync(redis_mock, 'some_prefix') + storage = RedisRuleBasedSegmentsStorageAsync(adapter) + + self.key = None + self.keys_ret = None + async def keys(sel, key): + self.key = key + self.keys_ret = [ + 'SPLITIO.rbsegment.segment1', + 'SPLITIO.rbsegment.segment2', + 'SPLITIO.rbsegment.segment3' + ] + return self.keys_ret + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.keys', new=keys) + + assert await storage.contains(['segment1', 'segment3']) + assert not await storage.contains(['segment1', 'segment4']) + assert await storage.contains(['segment1']) + assert not await storage.contains(['segment4', 'segment5']) diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index 2acf293f..ce1ade7e 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -8,14 +8,14 @@ from splitio.util.backoff import Backoff from splitio.api import APIException from splitio.api.commons import FetchOptions -from splitio.storage import SplitStorage +from splitio.storage import SplitStorage, RuleBasedSegmentsStorage from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySplitStorageAsync, InMemoryRuleBasedSegmentStorage, InMemoryRuleBasedSegmentStorageAsync from splitio.storage import FlagSetsFilter from splitio.models.splits import Split from splitio.models.rule_based_segments import RuleBasedSegment from splitio.sync.split import SplitSynchronizer, SplitSynchronizerAsync, LocalSplitSynchronizer, LocalSplitSynchronizerAsync, LocalhostMode from splitio.optional.loaders import aiofiles, asyncio -from tests.integration import splits_json +from tests.integration import splits_json, rbsegments_json splits_raw = [{ 'changeNumber': 123, @@ -861,12 +861,13 @@ async def get_changes(*args, **kwargs): class LocalSplitsSynchronizerTests(object): """Split synchronizer test cases.""" - splits = copy.deepcopy(splits_raw) + payload = copy.deepcopy(json_body) def test_synchronize_splits_error(self, mocker): """Test that if fetching splits fails at some_point, the task will continue running.""" storage = mocker.Mock(spec=SplitStorage) - split_synchronizer = LocalSplitSynchronizer("/incorrect_file", storage) + rbs_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) + split_synchronizer = LocalSplitSynchronizer("/incorrect_file", storage, rbs_storage) with pytest.raises(Exception): split_synchronizer.synchronize_splits(1) @@ -874,74 +875,75 @@ def test_synchronize_splits_error(self, mocker): def test_synchronize_splits(self, mocker): """Test split sync.""" storage = InMemorySplitStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage() - till = 123 def read_splits_from_json_file(*args, **kwargs): - return self.splits, till + return self.payload - split_synchronizer = LocalSplitSynchronizer("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizer("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_splits_from_json_file split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert isinstance(inserted_split, Split) assert inserted_split.name == 'some_name' # Should sync when changenumber is not changed - self.splits[0]['killed'] = True + self.payload["ff"]["d"][0]['killed'] = True split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed # Should not sync when changenumber is less than stored - till = 122 - self.splits[0]['killed'] = False + self.payload["ff"]["t"] = 122 + self.payload["ff"]["d"][0]['killed'] = False split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed # Should sync when changenumber is higher than stored - till = 124 + self.payload["ff"]["t"] = 1675095324999 split_synchronizer._current_json_sha = "-1" split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed == False # Should sync when till is default (-1) - till = -1 + self.payload["ff"]["t"] = -1 split_synchronizer._current_json_sha = "-1" - self.splits[0]['killed'] = True + self.payload["ff"]["d"][0]['killed'] = True split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed == True def test_sync_flag_sets_with_config_sets(self, mocker): """Test split sync with flag sets.""" storage = InMemorySplitStorage(['set1', 'set2']) - - split = self.splits[0].copy() + rbs_storage = InMemoryRuleBasedSegmentStorage() + + split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' - splits1 = [self.splits[0].copy(), split] - splits2 = self.splits.copy() - splits3 = self.splits.copy() - splits4 = self.splits.copy() + splits1 = [self.payload["ff"]["d"][0].copy(), split] + splits2 = self.payload["ff"]["d"].copy() + splits3 = self.payload["ff"]["d"].copy() + splits4 = self.payload["ff"]["d"].copy() self.called = 0 def read_feature_flags_from_json_file(*args, **kwargs): self.called += 1 if self.called == 1: - return splits1, 123 + return {"ff": {"d": splits1, "t": 123, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 2: splits2[0]['sets'] = ['set3'] - return splits2, 124 + return {"ff": {"d": splits2, "t": 124, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 3: splits3[0]['sets'] = ['set1'] - return splits3, 12434 + return {"ff": {"d": splits3, "t": 12434, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} splits4[0]['sets'] = ['set6'] splits4[0]['name'] = 'new_split' - return splits4, 12438 + return {"ff": {"d": splits4, "t": 12438, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} - split_synchronizer = LocalSplitSynchronizer("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizer("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_feature_flags_from_json_file split_synchronizer.synchronize_splits() @@ -959,30 +961,31 @@ def read_feature_flags_from_json_file(*args, **kwargs): def test_sync_flag_sets_without_config_sets(self, mocker): """Test split sync with flag sets.""" storage = InMemorySplitStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage() - split = self.splits[0].copy() + split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' - splits1 = [self.splits[0].copy(), split] - splits2 = self.splits.copy() - splits3 = self.splits.copy() - splits4 = self.splits.copy() + splits1 = [self.payload["ff"]["d"][0].copy(), split] + splits2 = self.payload["ff"]["d"].copy() + splits3 = self.payload["ff"]["d"].copy() + splits4 = self.payload["ff"]["d"].copy() self.called = 0 def read_feature_flags_from_json_file(*args, **kwargs): self.called += 1 if self.called == 1: - return splits1, 123 + return {"ff": {"d": splits1, "t": 123, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 2: splits2[0]['sets'] = ['set3'] - return splits2, 124 + return {"ff": {"d": splits2, "t": 124, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 3: splits3[0]['sets'] = ['set1'] - return splits3, 12434 + return {"ff": {"d": splits3, "t": 12434, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} splits4[0]['sets'] = ['set6'] splits4[0]['name'] = 'third_split' - return splits4, 12438 + return {"ff": {"d": splits4, "t": 12438, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} - split_synchronizer = LocalSplitSynchronizer("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizer("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_feature_flags_from_json_file split_synchronizer.synchronize_splits() @@ -1000,95 +1003,73 @@ def read_feature_flags_from_json_file(*args, **kwargs): def test_reading_json(self, mocker): """Test reading json file.""" f = open("./splits.json", "w") - json_body = {'splits': [{ - 'changeNumber': 123, - 'trafficTypeName': 'user', - 'name': 'some_name', - 'trafficAllocation': 100, - 'trafficAllocationSeed': 123456, - 'seed': 321654, - 'status': 'ACTIVE', - 'killed': False, - 'defaultTreatment': 'off', - 'algo': 2, - 'conditions': [ - { - 'partitions': [ - {'treatment': 'on', 'size': 50}, - {'treatment': 'off', 'size': 50} - ], - 'contitionType': 'WHITELIST', - 'label': 'some_label', - 'matcherGroup': { - 'matchers': [ - { - 'matcherType': 'WHITELIST', - 'whitelistMatcherData': { - 'whitelist': ['k1', 'k2', 'k3'] - }, - 'negate': False, - } - ], - 'combiner': 'AND' - } - } - ], - 'sets': ['set1'] - }], - "till":1675095324253, - "since":-1, - } - - f.write(json.dumps(json_body)) + f.write(json.dumps(self.payload)) f.close() storage = InMemorySplitStorage() - split_synchronizer = LocalSplitSynchronizer("./splits.json", storage, LocalhostMode.JSON) + rbs_storage = InMemoryRuleBasedSegmentStorage() + split_synchronizer = LocalSplitSynchronizer("./splits.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer.synchronize_splits() - inserted_split = storage.get(json_body['splits'][0]['name']) + inserted_split = storage.get(self.payload['ff']['d'][0]['name']) assert isinstance(inserted_split, Split) - assert inserted_split.name == 'some_name' + assert inserted_split.name == self.payload['ff']['d'][0]['name'] + + inserted_rbs = rbs_storage.get(self.payload['rbs']['d'][0]['name']) + assert isinstance(inserted_rbs, RuleBasedSegment) + assert inserted_rbs.name == self.payload['rbs']['d'][0]['name'] os.remove("./splits.json") def test_json_elements_sanitization(self, mocker): """Test sanitization.""" - split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) # check no changes if all elements exist with valid values - parsed = {"splits": [], "since": -1, "till": -1} + parsed = {"ff": {"d": [], "s": -1, "t": -1}, "rbs": {"d": [], "s": -1, "t": -1}} assert (split_synchronizer._sanitize_json_elements(parsed) == parsed) # check set since to -1 when is None parsed2 = parsed.copy() - parsed2['since'] = None + parsed2['ff']['s'] = None assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check no changes if since > -1 parsed2 = parsed.copy() - parsed2['since'] = 12 + parsed2['ff']['s'] = 12 assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check set till to -1 when is None parsed2 = parsed.copy() - parsed2['till'] = None + parsed2['ff']['t'] = None assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check add since when missing - parsed2 = {"splits": [], "till": -1} + parsed2 = {"ff": {"d": [], "t": -1}, "rbs": {"d": [], "s": -1, "t": -1}} assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check add till when missing - parsed2 = {"splits": [], "since": -1} + parsed2 = {"ff": {"d": [], "s": -1}, "rbs": {"d": [], "s": -1, "t": -1}} assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check add splits when missing - parsed2 = {"since": -1, "till": -1} + parsed2 = {"ff": {"s": -1, "t": -1}, "rbs": {"d": [], "s": -1, "t": -1}} assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) - def test_split_elements_sanitization(self, mocker): + # check add since when missing + parsed2 = {"ff": {"d": [], "t": -1}, "rbs": {"d": [], "t": -1}} + assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) + + # check add till when missing + parsed2 = {"ff": {"d": [], "s": -1}, "rbs": {"d": [], "s": -1}} + assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) + + # check add splits when missing + parsed2 = {"ff": {"s": -1, "t": -1}, "rbs": {"s": -1, "t": -1}} + assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) + + def test_elements_sanitization(self, mocker): """Test sanitization.""" - split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) # No changes when split structure is good assert (split_synchronizer._sanitize_feature_flag_elements(splits_json["splitChange1_1"]["splits"]) == splits_json["splitChange1_1"]["splits"]) @@ -1183,7 +1164,21 @@ def test_split_elements_sanitization(self, mocker): split[0]['algo'] = 1 assert (split_synchronizer._sanitize_feature_flag_elements(split)[0]['algo'] == 2) - def test_split_condition_sanitization(self, mocker): + # test 'status' is set to ACTIVE when None + rbs = copy.deepcopy(json_body["rbs"]["d"]) + rbs[0]['status'] = None + assert (split_synchronizer._sanitize_rb_segment_elements(rbs)[0]['status'] == 'ACTIVE') + + # test 'changeNumber' is set to 0 when invalid + rbs = copy.deepcopy(json_body["rbs"]["d"]) + rbs[0]['changeNumber'] = -2 + assert (split_synchronizer._sanitize_rb_segment_elements(rbs)[0]['changeNumber'] == 0) + + rbs = copy.deepcopy(json_body["rbs"]["d"]) + del rbs[0]['conditions'] + assert (len(split_synchronizer._sanitize_rb_segment_elements(rbs)[0]['conditions']) == 1) + + def test_condition_sanitization(self, mocker): """Test sanitization.""" split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock()) @@ -1218,13 +1213,14 @@ def test_split_condition_sanitization(self, mocker): class LocalSplitsSynchronizerAsyncTests(object): """Split synchronizer test cases.""" - splits = copy.deepcopy(splits_raw) + payload = copy.deepcopy(json_body) @pytest.mark.asyncio async def test_synchronize_splits_error(self, mocker): """Test that if fetching splits fails at some_point, the task will continue running.""" storage = mocker.Mock(spec=SplitStorage) - split_synchronizer = LocalSplitSynchronizerAsync("/incorrect_file", storage) + rbs_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) + split_synchronizer = LocalSplitSynchronizerAsync("/incorrect_file", storage, rbs_storage) with pytest.raises(Exception): await split_synchronizer.synchronize_splits(1) @@ -1233,75 +1229,76 @@ async def test_synchronize_splits_error(self, mocker): async def test_synchronize_splits(self, mocker): """Test split sync.""" storage = InMemorySplitStorageAsync() + rbs_storage = InMemoryRuleBasedSegmentStorageAsync() - till = 123 async def read_splits_from_json_file(*args, **kwargs): - return self.splits, till + return self.payload - split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_splits_from_json_file await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert isinstance(inserted_split, Split) assert inserted_split.name == 'some_name' # Should sync when changenumber is not changed - self.splits[0]['killed'] = True + self.payload["ff"]["d"][0]['killed'] = True await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed # Should not sync when changenumber is less than stored - till = 122 - self.splits[0]['killed'] = False + self.payload["ff"]["t"] = 122 + self.payload["ff"]["d"][0]['killed'] = False await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed # Should sync when changenumber is higher than stored - till = 124 + self.payload["ff"]["t"] = 1675095324999 split_synchronizer._current_json_sha = "-1" await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed == False # Should sync when till is default (-1) - till = -1 + self.payload["ff"]["t"] = -1 split_synchronizer._current_json_sha = "-1" - self.splits[0]['killed'] = True + self.payload["ff"]["d"][0]['killed'] = True await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed == True @pytest.mark.asyncio async def test_sync_flag_sets_with_config_sets(self, mocker): """Test split sync with flag sets.""" storage = InMemorySplitStorageAsync(['set1', 'set2']) - - split = self.splits[0].copy() + rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + + split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' - splits1 = [self.splits[0].copy(), split] - splits2 = self.splits.copy() - splits3 = self.splits.copy() - splits4 = self.splits.copy() + splits1 = [self.payload["ff"]["d"][0].copy(), split] + splits2 = self.payload["ff"]["d"].copy() + splits3 = self.payload["ff"]["d"].copy() + splits4 = self.payload["ff"]["d"].copy() self.called = 0 async def read_feature_flags_from_json_file(*args, **kwargs): self.called += 1 if self.called == 1: - return splits1, 123 + return {"ff": {"d": splits1, "t": 123, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 2: splits2[0]['sets'] = ['set3'] - return splits2, 124 + return {"ff": {"d": splits2, "t": 124, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 3: splits3[0]['sets'] = ['set1'] - return splits3, 12434 + return {"ff": {"d": splits3, "t": 12434, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} splits4[0]['sets'] = ['set6'] splits4[0]['name'] = 'new_split' - return splits4, 12438 + return {"ff": {"d": splits4, "t": 12438, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} - split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_feature_flags_from_json_file await split_synchronizer.synchronize_splits() @@ -1320,30 +1317,30 @@ async def read_feature_flags_from_json_file(*args, **kwargs): async def test_sync_flag_sets_without_config_sets(self, mocker): """Test split sync with flag sets.""" storage = InMemorySplitStorageAsync() - - split = self.splits[0].copy() + rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + + split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' - splits1 = [self.splits[0].copy(), split] - splits2 = self.splits.copy() - splits3 = self.splits.copy() - splits4 = self.splits.copy() + splits1 = [self.payload["ff"]["d"][0].copy(), split] + splits2 = self.payload["ff"]["d"].copy() + splits3 = self.payload["ff"]["d"].copy() + splits4 = self.payload["ff"]["d"].copy() self.called = 0 async def read_feature_flags_from_json_file(*args, **kwargs): self.called += 1 if self.called == 1: - return splits1, 123 + return {"ff": {"d": splits1, "t": 123, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 2: - splits2[0]['sets'] = ['set3'] - return splits2, 124 + return {"ff": {"d": splits2, "t": 124, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 3: splits3[0]['sets'] = ['set1'] - return splits3, 12434 + return {"ff": {"d": splits3, "t": 12434, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} splits4[0]['sets'] = ['set6'] splits4[0]['name'] = 'third_split' - return splits4, 12438 + return {"ff": {"d": splits4, "t": 12438, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} - split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_feature_flags_from_json_file await split_synchronizer.synchronize_splits() @@ -1362,13 +1359,18 @@ async def read_feature_flags_from_json_file(*args, **kwargs): async def test_reading_json(self, mocker): """Test reading json file.""" async with aiofiles.open("./splits.json", "w") as f: - await f.write(json.dumps(json_body)) + await f.write(json.dumps(self.payload)) storage = InMemorySplitStorageAsync() - split_synchronizer = LocalSplitSynchronizerAsync("./splits.json", storage, LocalhostMode.JSON) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + split_synchronizer = LocalSplitSynchronizerAsync("./splits.json", storage, rbs_storage, LocalhostMode.JSON) await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(json_body['splits'][0]['name']) + inserted_split = await storage.get(self.payload['ff']['d'][0]['name']) assert isinstance(inserted_split, Split) - assert inserted_split.name == 'some_name' + assert inserted_split.name == self.payload['ff']['d'][0]['name'] + + inserted_rbs = await rbs_storage.get(self.payload['rbs']['d'][0]['name']) + assert isinstance(inserted_rbs, RuleBasedSegment) + assert inserted_rbs.name == self.payload['rbs']['d'][0]['name'] os.remove("./splits.json") diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index b2ef9fa0..1e89af66 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -671,7 +671,6 @@ def test_start_periodic_data_recording(self, mocker): assert len(impression_count_task.start.mock_calls) == 1 assert len(event_task.start.mock_calls) == 1 - class RedisSynchronizerTests(object): def test_start_periodic_data_recording(self, mocker): impression_count_task = mocker.Mock(spec=ImpressionsCountSyncTask) @@ -744,7 +743,6 @@ def stop_mock(event): assert len(unique_keys_task.stop.mock_calls) == 1 assert len(clear_filter_task.stop.mock_calls) == 1 - class RedisSynchronizerAsyncTests(object): @pytest.mark.asyncio async def test_start_periodic_data_recording(self, mocker): From 2cbc6474bc6ec5f41615f9f3f0df3ae50c4603cb Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany <41021307+chillaq@users.noreply.github.com> Date: Fri, 14 Mar 2025 11:47:13 -0300 Subject: [PATCH 4/6] Update splitio/storage/pluggable.py Co-authored-by: Emiliano Sanchez --- splitio/storage/pluggable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 66fad1e5..1ac12bd2 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -15,7 +15,7 @@ _LOGGER = logging.getLogger(__name__) class PluggableRuleBasedSegmentsStorageBase(RuleBasedSegmentsStorage): - """RedPluggable storage for rule based segments.""" + """Pluggable storage for rule based segments.""" _TILL_LENGTH = 4 From d0b2c6729f9f0f18a271da2d9472d6a3652bc638 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany <41021307+chillaq@users.noreply.github.com> Date: Fri, 14 Mar 2025 11:47:20 -0300 Subject: [PATCH 5/6] Update splitio/storage/pluggable.py Co-authored-by: Emiliano Sanchez --- splitio/storage/pluggable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 1ac12bd2..20d4d437 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -98,7 +98,7 @@ def get_large_segment_names(self): pass class PluggableRuleBasedSegmentsStorage(PluggableRuleBasedSegmentsStorageBase): - """RedPluggable storage for rule based segments.""" + """Pluggable storage for rule based segments.""" def __init__(self, pluggable_adapter, prefix=None): """ From cc990a9bedcb7de033ac8f07b8ee416023a5bebe Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany <41021307+chillaq@users.noreply.github.com> Date: Fri, 14 Mar 2025 11:47:27 -0300 Subject: [PATCH 6/6] Update splitio/storage/pluggable.py Co-authored-by: Emiliano Sanchez --- splitio/storage/pluggable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 20d4d437..c27a92fd 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -179,7 +179,7 @@ def get_segment_names(self): return None class PluggableRuleBasedSegmentsStorageAsync(PluggableRuleBasedSegmentsStorageBase): - """RedPluggable storage for rule based segments.""" + """Pluggable storage for rule based segments.""" def __init__(self, pluggable_adapter, prefix=None): """