From 52f0e6ba318111bf946e87ff66ecb9fc1e711525 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Fri, 12 Apr 2024 15:59:40 +0200 Subject: [PATCH 1/2] Fix cleaning > 10000 channels Elastic/ChannelFinder has a setting ES_QUERY_SIZE. So by default queries only return up to ES_QUERY_SIZE channels, which by default is 10000. When the clean runs it fetches using this api all channels where the pvStatus is Active. But if there are more than 10000 this won't work due to the ES_QUERY_SIZE. Solution here is to keep setting channels to Inactive and querying for more channels until there are no channels set to Active. --- server/recceiver/cfstore.py | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 097acaa..9db39f7 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -282,17 +282,14 @@ def clean_service(self): while 1: try: _log.info("CF Clean Started") - channels = self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)])) + channels = self.get_active_channels(recceiverid) if channels is not None: - new_channels = [] - for ch in channels or []: - new_channels.append(ch[u'name']) - _log.info("Total channels to update: {nChannels}", nChannels=len(new_channels)) - while len(new_channels) > 0: - _log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), 10000)) - self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, - channelNames=new_channels[:10000]) - new_channels = new_channels[10000:] + while channels is not None and len(channels) > 0: + self.clean_channels(owner, channels) + channels = self.get_active_channels(recceiverid) + _log.info("CF Clean Completed") + return + else: _log.info("CF Clean Completed") return except RequestException as e: @@ -305,6 +302,18 @@ def clean_service(self): _log.info("Abandoning clean after {retry_limit} seconds", retry_limit=retry_limit) return + def get_active_channels(self, recceiverid): + return self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)], 10000)) + + def clean_channels(self, owner, channels): + new_channels = [] + for ch in channels or []: + new_channels.append(ch[u'name']) + _log.info("Total channels to update: {nChannels}", nChannels=len(new_channels)) + _log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), 10000)) + self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, + channelNames=new_channels) + def dict_to_file(dict, iocs, conf): filename = conf.get('debug_file_loc', None) @@ -544,10 +553,10 @@ def getCurrentTime(): return str(datetime.datetime.now()) -def prepareFindArgs(conf, args): - size = conf.get('findSizeLimit', 0) - if size > 0: - args.append(('~size', size)) +def prepareFindArgs(conf, args, size=0): + size_limit = conf.get('findSizeLimit', size) + if size_limit > 0: + args.append(('~size', size_limit)) return args From 65d2662c954c9b9bf63ed987e4e5e5ee4d06a77e Mon Sep 17 00:00:00 2001 From: skybrewer Date: Wed, 17 Jul 2024 14:24:24 +0200 Subject: [PATCH 2/2] Use findSizeLimit as size limit --- server/recceiver/cfstore.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 9db39f7..bb4b7d8 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -303,14 +303,14 @@ def clean_service(self): return def get_active_channels(self, recceiverid): - return self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)], 10000)) + return self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)])) def clean_channels(self, owner, channels): new_channels = [] for ch in channels or []: new_channels.append(ch[u'name']) _log.info("Total channels to update: {nChannels}", nChannels=len(new_channels)) - _log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), 10000)) + _log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), self.conf["findSizeLimit"])) self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, channelNames=new_channels)