Skip to content

Commit

Permalink
Added invalidating of cached shared attributes for device
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Feb 5, 2025
1 parent e1f027b commit 2192da1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
41 changes: 28 additions & 13 deletions thingsboard_gateway/gateway/tb_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ def _watchers(self):

if not self.tb_client.is_connected() and self.__subscribed_to_rpc_topics:
self.__subscribed_to_rpc_topics = False
self.__devices_shared_attributes = {}

if (not self.tb_client.is_connected()
and self.__remote_configurator is not None
Expand Down Expand Up @@ -985,6 +986,14 @@ def __update_connector_devices(self, connector):
(device['connector'].name == connector.name or device['connector'].get_id() == connector.get_id())):
self.update_device(device_name, 'connector', connector)

def clean_shared_attributes_cache_for_connector_devices(self, connector):
connector_devices = self.__get_connector_devices(connector)
for device_name in connector_devices:
self.__devices_shared_attributes.pop(device_name, None)

def __get_connector_devices(self, connector):
return [device_name for device_name, device in self.__connected_devices.items() if device.get('connector') and device['connector'].get_id() == connector.get_id()]

def __cleanup_connectors(self):
self.available_connectors_by_id = {connector_id: connector for (connector_id, connector) in
self.available_connectors_by_id.items() if not connector.is_stopped()}
Expand Down Expand Up @@ -1799,11 +1808,15 @@ def add_device_async(self, data):

def add_device(self, device_name, content, device_type=None):
if self.tb_client is None or not self.tb_client.is_connected():
self.__devices_shared_attributes = {}
return False

device_type = device_type if device_type is not None else 'default'
if (device_name in self.__connected_devices or
TBUtility.get_dict_key_by_value(self.__renamed_devices, device_name) is not None):
if self.__sync_devices_shared_attributes_on_connect and hasattr(content['connector'],
'get_device_shared_attributes_keys'):
self.__sync_device_shared_attrs_queue.put((device_name, content['connector']))
return True

self.__connected_devices[device_name] = {**content, DEVICE_TYPE_PARAMETER: device_type}
Expand Down Expand Up @@ -1842,19 +1855,19 @@ def __sync_device_shared_attrs_loop(self):
sleep(.1)

def __process_sync_device_shared_attrs(self, device_name, connector):
shared_attributes = connector.get_device_shared_attributes_keys(device_name)
if device_name in self.__devices_shared_attributes:
device_shared_attrs = self.__devices_shared_attributes.get(device_name)
shared_attributes_request = {
'device': device_name,
'data': device_shared_attrs
}
# TODO: request shared attributes on init for all configured devices simultaneously to synchronize shared attributes
connector.on_attributes_update(shared_attributes_request)
else:
self.tb_client.client.gw_request_shared_attributes(device_name,
shared_attributes,
(self._attribute_update_callback, shared_attributes))
shared_attributes = connector.get_device_shared_attributes_keys(device_name)
if device_name in self.__devices_shared_attributes:
device_shared_attrs = self.__devices_shared_attributes.get(device_name)
shared_attributes_request = {
'device': device_name,
'data': device_shared_attrs
}
# TODO: request shared attributes on init for all configured devices simultaneously to synchronize shared attributes # noqa
connector.on_attributes_update(shared_attributes_request)
else:
self.tb_client.client.gw_request_shared_attributes(device_name,
shared_attributes,
(self._attribute_update_callback, shared_attributes))

def update_device(self, device_name, event, content: Connector):
should_save = False
Expand Down Expand Up @@ -1895,6 +1908,8 @@ def del_device(self, device_name):
self.__saved_devices.pop(device_name, None)
self.__added_devices.pop(device_name, None)
self.__save_persistent_devices()
if device_name in self.__devices_shared_attributes:
self.__devices_shared_attributes.pop(device_name, None)

def get_report_strategy_service(self):
return self._report_strategy_service
Expand Down
12 changes: 8 additions & 4 deletions thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,19 +671,23 @@ def _handle_connector_configuration_update(self, config):

if connector_configuration.get('id') in self._gateway.available_connectors_by_id:
try:
retrieved_connector = self._gateway.available_connectors_by_id[connector_configuration['id']]
close_start = monotonic()
while not self._gateway.available_connectors_by_id[connector_configuration['id']].is_stopped(): # noqa
self._gateway.available_connectors_by_id[connector_configuration['id']].close()
while not retrieved_connector.is_stopped(): # noqa
retrieved_connector.close()
self._gateway.clean_shared_attributes_cache_for_connector_devices(retrieved_connector)
if monotonic() - close_start > 5:
self.__log.error('Connector %s not stopped in 5 seconds', connector_configuration['id']) # noqa
break
except Exception as e:
self.__log.exception("Exception on closing connector occurred:", exc_info=e)
elif connector_configuration.get('name') in self._gateway.available_connectors_by_name:
try:
retrieved_connector = self._gateway.available_connectors_by_name[connector_configuration['name']]
close_start = monotonic()
while not self._gateway.available_connectors_by_name[connector_configuration['name']].is_stopped(): # noqa
self._gateway.available_connectors_by_name[connector_configuration['name']].close()
while not retrieved_connector.is_stopped():
retrieved_connector.close()
self._gateway.clean_shared_attributes_cache_for_connector_devices(retrieved_connector)
if monotonic() - close_start > 5:
self.__log.error('Connector %s not stopped in 5 seconds',
connector_configuration['name'])
Expand Down

0 comments on commit 2192da1

Please sign in to comment.