Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated SNMP Connector #1003

Merged
merged 1 commit into from
Nov 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 111 additions & 95 deletions thingsboard_gateway/connectors/snmp/snmp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from random import choice
from re import search
from socket import gethostbyname
Expand All @@ -23,12 +24,22 @@
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.tb_utility.tb_utility import TBUtility

# Try import Pymodbus library or install it and import
installation_required = False

try:
import puresnmp
from puresnmp import __version__ as pymodbus_version

if int(pymodbus_version.split('.')[0]) < 2:
installation_required = True
except ImportError:
TBUtility.install_package("puresnmp")
import puresnmp
installation_required = True

if installation_required:
print("Modbus library not found - installing...")
TBUtility.install_package("puresnmp", ">=2.0.0")

from puresnmp import Client, credentials, PyWrapper
from puresnmp.exc import Timeout as SNMPTimeoutException


Expand All @@ -48,10 +59,13 @@ def __init__(self, gateway, config, connector_type):
self._default_converters = {
"uplink": "SNMPUplinkConverter",
"downlink": "SNMPDownlinkConverter"
}
self.__methods = ["get", "multiget", "getnext", "multigetnext", "walk", "multiwalk", "set", "multiset", "bulkget", "bulkwalk", "table", "bulktable"]
}
self.__methods = ["get", "multiget", "getnext", "walk", "multiwalk", "set", "multiset",
"bulkget", "bulkwalk", "table", "bulktable"]
self.__datatypes = ('attributes', 'telemetry')

self.__loop = asyncio.new_event_loop()

def open(self):
self.__stopped = False
self.__fill_converters()
Expand All @@ -60,22 +74,25 @@ def open(self):
def run(self):
self._connected = True
try:
while not self.__stopped:
current_time = time() * 1000
for device in self.__devices:
try:
if device.get("previous_poll_time", 0) + device.get("pollPeriod", 10000) < current_time:
self.__process_data(device)
device["previous_poll_time"] = current_time
except Exception as e:
log.exception(e)
if self.__stopped:
break
else:
sleep(.2)
self.__loop.run_until_complete(self._run())
except Exception as e:
log.exception(e)

async def _run(self):
while not self.__stopped:
current_time = time() * 1000
for device in self.__devices:
try:
if device.get("previous_poll_time", 0) + device.get("pollPeriod", 10000) < current_time:
await self.__process_data(device)
device["previous_poll_time"] = current_time
except Exception as e:
log.exception(e)
if self.__stopped:
break
else:
sleep(.2)

def close(self):
self.__stopped = True
self._connected = False
Expand All @@ -86,47 +103,12 @@ def get_name(self):
def is_connected(self):
return self._connected

def on_attributes_update(self, content):
try:
for device in self.__devices:
if content["device"] == device["deviceName"]:
for attribute_request_config in device["attributeUpdateRequests"]:
for attribute, value in content["data"]:
if search(attribute, attribute_request_config["attributeFilter"]):
common_parameters = self.__get_common_parameters(device)
result = self.__process_methods(attribute_request_config["method"], common_parameters,
{**attribute_request_config, "value": value})
log.debug("Received attribute update request for device \"%s\" with attribute \"%s\" and value \"%s\"", content["device"],
attribute)
log.debug(result)
log.debug(content)
except Exception as e:
log.exception(e)

def server_side_rpc_handler(self, content):
try:
for device in self.__devices:
if content["device"] == device["deviceName"]:
for rpc_request_config in device["serverSideRpcRequests"]:
if search(content["data"]["method"], rpc_request_config["requestFilter"]):
common_parameters = self.__get_common_parameters(device)
result = self.__process_methods(rpc_request_config["method"], common_parameters,
{**rpc_request_config, "value": content["data"]["params"]})
log.debug("Received RPC request for device \"%s\" with command \"%s\" and value \"%s\"", content["device"],
content["data"]["method"])
log.debug(result)
log.debug(content)
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"], content=result)
except Exception as e:
log.exception(e)
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"], success_sent=False)

def collect_statistic_and_send(self, connector_name, data):
self.statistics["MessagesReceived"] = self.statistics["MessagesReceived"] + 1
self.__gateway.send_to_storage(connector_name, data)
self.statistics["MessagesSent"] = self.statistics["MessagesSent"] + 1

def __process_data(self, device):
async def __process_data(self, device):
common_parameters = self.__get_common_parameters(device)
converted_data = {}
for datatype in self.__datatypes:
Expand All @@ -141,10 +123,11 @@ def __process_data(self, device):
method = method.lower()
if method not in self.__methods:
log.error("Unknown method: %s, configuration is: %r", method, datatype_config)
response = self.__process_methods(method, common_parameters, datatype_config)
response = await self.__process_methods(method, common_parameters, datatype_config)
converted_data.update(**device["uplink_converter"].convert((datatype, datatype_config), response))
except SNMPTimeoutException:
log.error("Timeout exception on connection to device \"%s\" with ip: \"%s\"", device["deviceName"], device["ip"])
log.error("Timeout exception on connection to device \"%s\" with ip: \"%s\"", device["deviceName"],
device["ip"])
return
except Exception as e:
log.exception(e)
Expand All @@ -153,89 +136,82 @@ def __process_data(self, device):
self.collect_statistic_and_send(self.get_name(), converted_data)

@staticmethod
def __process_methods(method, common_parameters, datatype_config):
async def __process_methods(method, common_parameters, datatype_config):
client = Client(ip=common_parameters['ip'],
port=common_parameters['port'],
credentials=credentials.V1(common_parameters['community']))
client.configure(timeout=common_parameters['timeout'])
client = PyWrapper(client)

response = None

if method == "get":
oid = datatype_config["oid"]
response = puresnmp.get(**common_parameters,
oid=oid)
response = await client.get(oid=oid)
elif method == "multiget":
oids = datatype_config["oid"]
oids = oids if isinstance(oids, list) else list(oids)
response = puresnmp.multiget(**common_parameters,
oids=oids)
response = await client.multiget(oids=oids)
elif method == "getnext":
oid = datatype_config["oid"]
master_response = puresnmp.getnext(**common_parameters,
oid=oid)
master_response = await client.getnext(oid=oid)
response = {master_response.oid: master_response.value}
elif method == "multigetnext":
oids = datatype_config["oid"]
oids = oids if isinstance(oids, list) else list(oids)
master_response = puresnmp.multigetnext(**common_parameters,
oids=oids)
response = {binded_var.oid: binded_var.value for binded_var in master_response}
elif method == "walk":
oid = datatype_config["oid"]
response = {binded_var.oid: binded_var.value for binded_var in list(puresnmp.walk(**common_parameters,
oid=oid))}
response = {}
async for binded_var in client.walk(oid=oid):
response[binded_var.oid] = binded_var.value
elif method == "multiwalk":
oids = datatype_config["oid"]
oids = oids if isinstance(oids, list) else list(oids)
response = {binded_var.oid: binded_var.value for binded_var in list(puresnmp.multiwalk(**common_parameters,
oids=oids))}
response = {}
async for binded_var in client.multiwalk(oids=oids):
response[binded_var.oid] = binded_var.value
elif method == "set":
oid = datatype_config["oid"]
value = datatype_config["value"]
response = puresnmp.set(**common_parameters,
oid=oid,
value=value)
response = await client.set(oid=oid, value=value)
elif method == "multiset":
mappings = datatype_config["mappings"]
response = puresnmp.multiset(**common_parameters,
mappings=mappings)
response = await client.multiset(mappings=mappings)
elif method == "bulkget":
scalar_oids = datatype_config.get("scalarOid", [])
scalar_oids = scalar_oids if isinstance(scalar_oids, list) else list(scalar_oids)
repeating_oids = datatype_config.get("repeatingOid", [])
repeating_oids = repeating_oids if isinstance(repeating_oids, list) else list(repeating_oids)
max_list_size = datatype_config.get("maxListSize", 1)
response = puresnmp.bulkget(**common_parameters,
scalar_oids=scalar_oids,
repeating_oids=repeating_oids,
max_list_size=max_list_size)._asdict()
response = await client.bulkget(scalar_oids=scalar_oids, repeating_oids=repeating_oids,
max_list_size=max_list_size)
response = response.scalars
elif method == "bulkwalk":
oids = datatype_config["oid"]
oids = oids if isinstance(oids, list) else list(oids)
bulk_size = datatype_config.get("bulkSize", 10)
response = {binded_var.oid: binded_var.value for binded_var in list(puresnmp.bulkwalk(**common_parameters,
bulk_size=bulk_size,
oids=oids))}
response = {}
async for binded_var in client.bulkwalk(bulk_size=bulk_size, oids=oids):
response[binded_var.oid] = binded_var.value
elif method == "table":
oid = datatype_config["oid"]
del common_parameters["timeout"]
num_base_nodes = datatype_config.get("numBaseNodes", 0)
response = puresnmp.table(**common_parameters,
oid=oid,
num_base_nodes=num_base_nodes)
response = await client.table(oid=oid)
elif method == "bulktable":
oid = datatype_config["oid"]
num_base_nodes = datatype_config.get("numBaseNodes", 0)
bulk_size = datatype_config.get("bulkSize", 10)
response = puresnmp.bulktable(**common_parameters,
oid=oid,
num_base_nodes=num_base_nodes,
bulk_size=bulk_size)
response = await client.bulktable(oid=oid, bulk_size=bulk_size)
else:
log.error("Method \"%s\" - Not found", str(method))
return response

def __fill_converters(self):
try:
for device in self.__devices:
device["uplink_converter"] = TBModuleLoader.import_module("snmp", device.get('converter', self._default_converters["uplink"]))(device)
device["downlink_converter"] = TBModuleLoader.import_module("snmp", device.get('converter', self._default_converters["downlink"]))(device)
device["uplink_converter"] = TBModuleLoader.import_module("snmp", device.get('converter',
self._default_converters[
"uplink"]))(device)
device["downlink_converter"] = TBModuleLoader.import_module("snmp", device.get('converter',
self._default_converters[
"downlink"]))(device)
except Exception as e:
log.exception(e)

Expand All @@ -246,3 +222,43 @@ def __get_common_parameters(device):
"timeout": device.get("timeout", 6),
"community": device["community"],
}

def on_attributes_update(self, content):
try:
for device in self.__devices:
if content["device"] == device["deviceName"]:
for attribute_request_config in device["attributeUpdateRequests"]:
for attribute, value in content["data"]:
if search(attribute, attribute_request_config["attributeFilter"]):
common_parameters = self.__get_common_parameters(device)
result = self.__process_methods(attribute_request_config["method"], common_parameters,
{**attribute_request_config, "value": value})
log.debug(
"Received attribute update request for device \"%s\" "
"with attribute \"%s\" and value \"%s\"",
content["device"],
attribute)
log.debug(result)
log.debug(content)
except Exception as e:
log.exception(e)

def server_side_rpc_handler(self, content):
try:
for device in self.__devices:
if content["device"] == device["deviceName"]:
for rpc_request_config in device["serverSideRpcRequests"]:
if search(content["data"]["method"], rpc_request_config["requestFilter"]):
common_parameters = self.__get_common_parameters(device)
result = self.__process_methods(rpc_request_config["method"], common_parameters,
{**rpc_request_config, "value": content["data"]["params"]})
log.debug("Received RPC request for device \"%s\" with command \"%s\" and value \"%s\"",
content["device"],
content["data"]["method"])
log.debug(result)
log.debug(content)
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
content=result)
except Exception as e:
log.exception(e)
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"], success_sent=False)