From ec6f83ee93fc2e77b5f669a2ae70e89e300505fe Mon Sep 17 00:00:00 2001 From: Koos85 Date: Tue, 17 Sep 2024 16:02:38 +0200 Subject: [PATCH] raise invalidconfigexception. changed v3 client arguments --- asyncsnmplib/client.py | 34 ++++-------- asyncsnmplib/utils.py | 112 +++++++++++++--------------------------- asyncsnmplib/v3/auth.py | 14 +++-- asyncsnmplib/v3/encr.py | 10 ++-- 4 files changed, 64 insertions(+), 106 deletions(-) diff --git a/asyncsnmplib/client.py b/asyncsnmplib/client.py index 06f7dbf..353c999 100644 --- a/asyncsnmplib/client.py +++ b/asyncsnmplib/client.py @@ -1,5 +1,5 @@ import asyncio -from typing import Iterable, Optional, Tuple, List +from typing import Iterable, Optional, Tuple, List, Type from .exceptions import ( SnmpNoConnection, SnmpErrorNoSuchName, @@ -10,8 +10,8 @@ from .package import SnmpMessage from .pdu import SnmpGet, SnmpGetNext, SnmpGetBulk from .protocol import SnmpProtocol -from .v3.auth import AUTH_PROTO -from .v3.encr import PRIV_PROTO +from .v3.auth import Auth +from .v3.encr import Priv from .v3.package import SnmpV3Message from .v3.protocol import SnmpV3Protocol @@ -162,10 +162,8 @@ def __init__( self, host: str, username: str, - auth_proto: str = 'USM_AUTH_NONE', - auth_passwd: Optional[str] = None, - priv_proto: str = 'USM_PRIV_NONE', - priv_passwd: Optional[str] = None, + auth: Optional[Tuple[Type[Auth], str]] = None, + priv: Optional[Tuple[Type[Priv], str]] = None, port: int = 161, max_rows: int = 10_000, loop: Optional[asyncio.AbstractEventLoop] = None): @@ -181,24 +179,12 @@ def __init__( self._auth_hash_localized = None self._priv_hash = None self._priv_hash_localized = None - try: - self._auth_proto = AUTH_PROTO[auth_proto] - except KeyError: - raise Exception('Supply valid auth_proto') - try: - self._priv_proto = PRIV_PROTO[priv_proto] - except KeyError: - raise Exception('Supply valid priv_proto') - if self._priv_proto and not self._auth_proto: - raise Exception('Supply auth_proto') - if self._auth_proto: - if auth_passwd is None: - raise Exception('Supply auth_passwd') + if auth is not None: + self._auth_proto, auth_passwd = auth self._auth_hash = self._auth_proto.hash_passphrase(auth_passwd) - if self._priv_proto: - if priv_passwd is None: - raise Exception('Supply priv_passwd') - self._priv_hash = self._auth_proto.hash_passphrase(priv_passwd) + if priv is not None: + self._priv_proto, priv_passwd = priv + self._priv_hash = self._auth_proto.hash_passphrase(priv_passwd) # On some systems it seems to be required to set the remote_addr argument # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.create_datagram_endpoint diff --git a/asyncsnmplib/utils.py b/asyncsnmplib/utils.py index b8e14e8..b9b8ed3 100644 --- a/asyncsnmplib/utils.py +++ b/asyncsnmplib/utils.py @@ -1,4 +1,6 @@ import logging +from typing import Dict, List, Tuple +from .asn1 import TOid, TValue from .client import Snmp, SnmpV1, SnmpV3 from .exceptions import SnmpException, SnmpNoConnection, SnmpNoAuthParams from .mib.utils import on_result_base @@ -6,16 +8,10 @@ from .v3.encr import PRIV_PROTO -class InvalidCredentialsException(SnmpException): - message = 'Invalid SNMP v3 credentials.' - - -class InvalidClientConfigException(SnmpException): - message = 'Invalid SNMP v3 client configuration.' - - -class InvalidSnmpVersionException(SnmpException): - message = 'Invalid SNMP version.' +class InvalidConfigException(SnmpException): + def __init__(self, message: str): + super().__init__(message) + self.message = message class ParseResultException(SnmpException): @@ -24,57 +20,10 @@ def __init__(self, message: str): self.message = message -def snmpv3_credentials(config: dict): - try: - user_name = config['username'] - except KeyError: - raise Exception(f'missing `username`') - - auth = config.get('auth') - if auth is not None: - auth_type = auth.get('type', 'USM_AUTH_NONE') - if auth_type != 'USM_AUTH_NONE': - if auth_type not in AUTH_PROTO: - raise Exception(f'invalid `auth.type`') - - try: - auth_passwd = auth['password'] - except KeyError: - raise Exception(f'missing `auth.password`') - - priv = config.get('priv', {}) - priv_type = priv.get('type', 'USM_PRIV_NONE') - if priv_type != 'USM_PRIV_NONE': - if priv_type not in PRIV_PROTO: - raise Exception(f'invalid `priv.type`') - - try: - priv_passwd = priv['password'] - except KeyError: - raise Exception(f'missing `priv.password`') - - return { - 'username': user_name, - 'auth_proto': auth_type, - 'auth_passwd': auth_passwd, - 'priv_proto': priv_type, - 'priv_passwd': priv_passwd, - } - else: - return { - 'username': user_name, - 'auth_proto': auth_type, - 'auth_passwd': auth_passwd, - } - return { - 'username': user_name, - } - - async def snmp_queries( address: str, config: dict, - queries: tuple): + queries: Tuple[TOid, ...]) -> Dict[str, List[Tuple[TOid, TValue]]]: version = config.get('version', '2c') @@ -83,38 +32,51 @@ async def snmp_queries( if isinstance(community, dict): community = community.get('secret') if not isinstance(community, str): - raise TypeError('SNMP community must be a string.') + raise InvalidConfigException('`community` must be a string.') cl = Snmp( host=address, community=community, ) elif version == '3': - try: - cred = snmpv3_credentials(config) - except Exception as e: - logging.warning(f'invalid snmpv3 credentials {address}: {e}') - raise InvalidCredentialsException - try: - cl = SnmpV3( - host=address, - **cred, - ) - except Exception as e: - logging.warning(f'invalid snmpv3 client config {address}: {e}') - raise InvalidClientConfigException + username = config.get('username') + if not isinstance(username, str): + raise InvalidConfigException('`username` must be a string.') + auth = config.get('auth') + if auth: + auth_proto = AUTH_PROTO.get(auth.get('type')) + auth_passwd = auth.get('password') + if auth_proto is None: + raise InvalidConfigException('`auth.type` invalid') + elif not isinstance(auth_passwd, str): + raise InvalidConfigException('`auth.password` must be string') + auth = (auth_proto, auth_passwd) + priv = auth and config.get('priv') + if priv: + priv_proto = PRIV_PROTO.get(priv.get('type')) + priv_passwd = priv.get('password') + if priv_proto is None: + raise InvalidConfigException('`priv.type` invalid') + elif not isinstance(priv_passwd, str): + raise InvalidConfigException('`priv.password` must be string') + priv = (priv, priv_passwd) + cl = SnmpV3( + host=address, + username=username, + auth=auth, + priv=priv, + ) elif version == '1': community = config.get('community', 'public') if isinstance(community, dict): community = community.get('secret') if not isinstance(community, str): - raise TypeError('SNMP community must be a string.') + raise InvalidConfigException('`community` must be a string.') cl = SnmpV1( host=address, community=community, ) else: - logging.warning(f'unsupported snmp version {address}: {version}') - raise InvalidSnmpVersionException + raise InvalidConfigException(f'unsupported snmp version {version}') try: await cl.connect() diff --git a/asyncsnmplib/v3/auth.py b/asyncsnmplib/v3/auth.py index 2fb8023..facba36 100644 --- a/asyncsnmplib/v3/auth.py +++ b/asyncsnmplib/v3/auth.py @@ -1,5 +1,6 @@ import struct from hashlib import md5, sha1 +from typing import Callable, Type, Dict def hash_passphrase(passphrase, hash_func): @@ -71,20 +72,25 @@ def authenticate_sha(auth_key, msg): return msg.replace(b'\x00' * 12, d2[:12], 1) -class USM_AUTH_HMAC96_MD5: +class Auth: + hash_passphrase: Callable + localize: Callable + auth: Callable + + +class USM_AUTH_HMAC96_MD5(Auth): hash_passphrase = hash_passphrase_md5 localize = localize_key_md5 auth = authenticate_md5 -class USM_AUTH_HMAC96_SHA: +class USM_AUTH_HMAC96_SHA(Auth): hash_passphrase = hash_passphrase_sha localize = localize_key_sha auth = authenticate_sha -AUTH_PROTO = { +AUTH_PROTO: Dict[str, Type[Auth]] = { 'USM_AUTH_HMAC96_MD5': USM_AUTH_HMAC96_MD5, 'USM_AUTH_HMAC96_SHA': USM_AUTH_HMAC96_SHA, - 'USM_AUTH_NONE': None, } diff --git a/asyncsnmplib/v3/encr.py b/asyncsnmplib/v3/encr.py index ba88191..6454279 100644 --- a/asyncsnmplib/v3/encr.py +++ b/asyncsnmplib/v3/encr.py @@ -3,6 +3,7 @@ from Crypto.Cipher import DES, AES from Crypto.Random import get_random_bytes from Crypto.Util.Padding import pad +from typing import Callable, Type, Dict def encrypt_data(key, data, msgsecurityparams): @@ -73,12 +74,16 @@ def decrypt_data_aes(key, data, msgsecurityparams): return obj.decrypt(pad(data, 16)) -class USM_PRIV_CBC56_DES: +class Priv: + encrypt: Callable + + +class USM_PRIV_CBC56_DES(Priv): encrypt = encrypt_data decrypt = decrypt_data -class USM_PRIV_CFB128_AES: +class USM_PRIV_CFB128_AES(Priv): encrypt = encrypt_data_aes decrypt = decrypt_data_aes @@ -86,5 +91,4 @@ class USM_PRIV_CFB128_AES: PRIV_PROTO = { 'USM_PRIV_CBC56_DES': USM_PRIV_CBC56_DES, 'USM_PRIV_CFB128_AES': USM_PRIV_CFB128_AES, - 'USM_PRIV_NONE': None, }