Skip to content

Commit

Permalink
Merge pull request #996 from samson0v/master
Browse files Browse the repository at this point in the history
Fixed data misplaced for OPC-UA AsyncIO Connector
  • Loading branch information
imbeacon authored Nov 16, 2022
2 parents 4f71dd9 + 67ce038 commit f8342c9
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@


class OpcUaConnectorAsyncIO(Connector, Thread):
DATA_TO_SEND = Queue(-1)
SUB_DATA_TO_CONVERT = Queue(-1)

def __init__(self, gateway, config, connector_type):
self.statistics = {'MessagesReceived': 0,
'MessagesSent': 0}
Expand All @@ -67,6 +64,9 @@ def __init__(self, gateway, config, connector_type):
else:
self.__opcua_url = self.__server_conf.get("url")

self.__data_to_send = Queue(-1)
self.__sub_data_to_convert = Queue(-1)

self.__loop = asyncio.new_event_loop()

self.__client = None
Expand Down Expand Up @@ -301,8 +301,8 @@ async def __scan_device_nodes(self):

def __convert_sub_data(self):
while not self.__stopped:
if not OpcUaConnectorAsyncIO.SUB_DATA_TO_CONVERT.empty():
sub_node, data = OpcUaConnectorAsyncIO.SUB_DATA_TO_CONVERT.get()
if not self.__sub_data_to_convert.empty():
sub_node, data = self.__sub_data_to_convert.get()

for device in self.__device_nodes:
for section in ('attributes', 'timeseries'):
Expand All @@ -313,7 +313,7 @@ def __convert_sub_data(self):
converter_data = device.converter_for_sub.get_data()

if converter_data:
OpcUaConnectorAsyncIO.DATA_TO_SEND.put(*converter_data)
self.__data_to_send.put(*converter_data)
device.converter_for_sub.clear_data()
else:
sleep(.2)
Expand Down Expand Up @@ -345,9 +345,11 @@ async def __poll_nodes(self):
value = await var.read_data_value()
device.converter.convert(config={'section': section, 'key': node['key']}, val=value)

if not self.__server_conf.get('disableSubscriptions', False) and not node.get('sub_on', False):
if not self.__server_conf.get('disableSubscriptions', False) and not node.get('sub_on',
False):
if self.__subscription is None:
self.__subscription = await self.__client.create_subscription(1, SubHandler())
self.__subscription = await self.__client.create_subscription(1, SubHandler(
self.__sub_data_to_convert))
handle = await self.__subscription.subscribe_data_change(var)
node['subscription'] = handle
node['sub_on'] = True
Expand All @@ -372,14 +374,14 @@ async def __poll_nodes(self):

converter_data = device.converter.get_data()
if converter_data:
OpcUaConnectorAsyncIO.DATA_TO_SEND.put(*converter_data)
self.__data_to_send.put(*converter_data)

device.converter.clear_data()

def __send_data(self):
while not self.__stopped:
if not OpcUaConnectorAsyncIO.DATA_TO_SEND.empty():
data = OpcUaConnectorAsyncIO.DATA_TO_SEND.get()
if not self.__data_to_send.empty():
data = self.__data_to_send.get()
self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1
self.__log.debug(data)
self.__gateway.send_to_storage(self.get_name(), data)
Expand Down Expand Up @@ -500,10 +502,12 @@ async def __call_method(self, path, arguments, result={}):


class SubHandler:
@staticmethod
def datachange_notification(node, _, data):
def __init__(self, queue):
self.__queue = queue

def datachange_notification(self, node, _, data):
log.debug("New data change event %s %s", node, data)
OpcUaConnectorAsyncIO.SUB_DATA_TO_CONVERT.put((node, data))
self.__queue.put((node, data))

@staticmethod
def event_notification(event):
Expand Down

0 comments on commit f8342c9

Please sign in to comment.