Skip to content

Commit

Permalink
raise invalidconfigexception. changed v3 client arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
Koos85 committed Sep 17, 2024
1 parent 15c6c45 commit ec6f83e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 106 deletions.
34 changes: 10 additions & 24 deletions asyncsnmplib/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Iterable, Optional, Tuple, List
from typing import Iterable, Optional, Tuple, List, Type
from .exceptions import (
SnmpNoConnection,
SnmpErrorNoSuchName,
Expand All @@ -10,8 +10,8 @@
from .package import SnmpMessage
from .pdu import SnmpGet, SnmpGetNext, SnmpGetBulk
from .protocol import SnmpProtocol
from .v3.auth import AUTH_PROTO
from .v3.encr import PRIV_PROTO
from .v3.auth import Auth
from .v3.encr import Priv
from .v3.package import SnmpV3Message
from .v3.protocol import SnmpV3Protocol

Expand Down Expand Up @@ -162,10 +162,8 @@ def __init__(
self,
host: str,
username: str,
auth_proto: str = 'USM_AUTH_NONE',
auth_passwd: Optional[str] = None,
priv_proto: str = 'USM_PRIV_NONE',
priv_passwd: Optional[str] = None,
auth: Optional[Tuple[Type[Auth], str]] = None,
priv: Optional[Tuple[Type[Priv], str]] = None,
port: int = 161,
max_rows: int = 10_000,
loop: Optional[asyncio.AbstractEventLoop] = None):
Expand All @@ -181,24 +179,12 @@ def __init__(
self._auth_hash_localized = None
self._priv_hash = None
self._priv_hash_localized = None
try:
self._auth_proto = AUTH_PROTO[auth_proto]
except KeyError:
raise Exception('Supply valid auth_proto')
try:
self._priv_proto = PRIV_PROTO[priv_proto]
except KeyError:
raise Exception('Supply valid priv_proto')
if self._priv_proto and not self._auth_proto:
raise Exception('Supply auth_proto')
if self._auth_proto:
if auth_passwd is None:
raise Exception('Supply auth_passwd')
if auth is not None:
self._auth_proto, auth_passwd = auth
self._auth_hash = self._auth_proto.hash_passphrase(auth_passwd)
if self._priv_proto:
if priv_passwd is None:
raise Exception('Supply priv_passwd')
self._priv_hash = self._auth_proto.hash_passphrase(priv_passwd)
if priv is not None:
self._priv_proto, priv_passwd = priv
self._priv_hash = self._auth_proto.hash_passphrase(priv_passwd)

# On some systems it seems to be required to set the remote_addr argument
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.create_datagram_endpoint
Expand Down
112 changes: 37 additions & 75 deletions asyncsnmplib/utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import logging
from typing import Dict, List, Tuple
from .asn1 import TOid, TValue
from .client import Snmp, SnmpV1, SnmpV3
from .exceptions import SnmpException, SnmpNoConnection, SnmpNoAuthParams
from .mib.utils import on_result_base
from .v3.auth import AUTH_PROTO
from .v3.encr import PRIV_PROTO


class InvalidCredentialsException(SnmpException):
message = 'Invalid SNMP v3 credentials.'


class InvalidClientConfigException(SnmpException):
message = 'Invalid SNMP v3 client configuration.'


class InvalidSnmpVersionException(SnmpException):
message = 'Invalid SNMP version.'
class InvalidConfigException(SnmpException):
def __init__(self, message: str):
super().__init__(message)
self.message = message


class ParseResultException(SnmpException):
Expand All @@ -24,57 +20,10 @@ def __init__(self, message: str):
self.message = message


def snmpv3_credentials(config: dict):
try:
user_name = config['username']
except KeyError:
raise Exception(f'missing `username`')

auth = config.get('auth')
if auth is not None:
auth_type = auth.get('type', 'USM_AUTH_NONE')
if auth_type != 'USM_AUTH_NONE':
if auth_type not in AUTH_PROTO:
raise Exception(f'invalid `auth.type`')

try:
auth_passwd = auth['password']
except KeyError:
raise Exception(f'missing `auth.password`')

priv = config.get('priv', {})
priv_type = priv.get('type', 'USM_PRIV_NONE')
if priv_type != 'USM_PRIV_NONE':
if priv_type not in PRIV_PROTO:
raise Exception(f'invalid `priv.type`')

try:
priv_passwd = priv['password']
except KeyError:
raise Exception(f'missing `priv.password`')

return {
'username': user_name,
'auth_proto': auth_type,
'auth_passwd': auth_passwd,
'priv_proto': priv_type,
'priv_passwd': priv_passwd,
}
else:
return {
'username': user_name,
'auth_proto': auth_type,
'auth_passwd': auth_passwd,
}
return {
'username': user_name,
}


async def snmp_queries(
address: str,
config: dict,
queries: tuple):
queries: Tuple[TOid, ...]) -> Dict[str, List[Tuple[TOid, TValue]]]:

version = config.get('version', '2c')

Expand All @@ -83,38 +32,51 @@ async def snmp_queries(
if isinstance(community, dict):
community = community.get('secret')
if not isinstance(community, str):
raise TypeError('SNMP community must be a string.')
raise InvalidConfigException('`community` must be a string.')
cl = Snmp(
host=address,
community=community,
)
elif version == '3':
try:
cred = snmpv3_credentials(config)
except Exception as e:
logging.warning(f'invalid snmpv3 credentials {address}: {e}')
raise InvalidCredentialsException
try:
cl = SnmpV3(
host=address,
**cred,
)
except Exception as e:
logging.warning(f'invalid snmpv3 client config {address}: {e}')
raise InvalidClientConfigException
username = config.get('username')
if not isinstance(username, str):
raise InvalidConfigException('`username` must be a string.')
auth = config.get('auth')
if auth:
auth_proto = AUTH_PROTO.get(auth.get('type'))
auth_passwd = auth.get('password')
if auth_proto is None:
raise InvalidConfigException('`auth.type` invalid')
elif not isinstance(auth_passwd, str):
raise InvalidConfigException('`auth.password` must be string')
auth = (auth_proto, auth_passwd)
priv = auth and config.get('priv')
if priv:
priv_proto = PRIV_PROTO.get(priv.get('type'))
priv_passwd = priv.get('password')
if priv_proto is None:
raise InvalidConfigException('`priv.type` invalid')
elif not isinstance(priv_passwd, str):
raise InvalidConfigException('`priv.password` must be string')
priv = (priv, priv_passwd)
cl = SnmpV3(
host=address,
username=username,
auth=auth,
priv=priv,
)
elif version == '1':
community = config.get('community', 'public')
if isinstance(community, dict):
community = community.get('secret')
if not isinstance(community, str):
raise TypeError('SNMP community must be a string.')
raise InvalidConfigException('`community` must be a string.')
cl = SnmpV1(
host=address,
community=community,
)
else:
logging.warning(f'unsupported snmp version {address}: {version}')
raise InvalidSnmpVersionException
raise InvalidConfigException(f'unsupported snmp version {version}')

try:
await cl.connect()
Expand Down
14 changes: 10 additions & 4 deletions asyncsnmplib/v3/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import struct
from hashlib import md5, sha1
from typing import Callable, Type, Dict


def hash_passphrase(passphrase, hash_func):
Expand Down Expand Up @@ -71,20 +72,25 @@ def authenticate_sha(auth_key, msg):
return msg.replace(b'\x00' * 12, d2[:12], 1)


class USM_AUTH_HMAC96_MD5:
class Auth:
hash_passphrase: Callable
localize: Callable
auth: Callable


class USM_AUTH_HMAC96_MD5(Auth):
hash_passphrase = hash_passphrase_md5
localize = localize_key_md5
auth = authenticate_md5


class USM_AUTH_HMAC96_SHA:
class USM_AUTH_HMAC96_SHA(Auth):
hash_passphrase = hash_passphrase_sha
localize = localize_key_sha
auth = authenticate_sha


AUTH_PROTO = {
AUTH_PROTO: Dict[str, Type[Auth]] = {
'USM_AUTH_HMAC96_MD5': USM_AUTH_HMAC96_MD5,
'USM_AUTH_HMAC96_SHA': USM_AUTH_HMAC96_SHA,
'USM_AUTH_NONE': None,
}
10 changes: 7 additions & 3 deletions asyncsnmplib/v3/encr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from Crypto.Cipher import DES, AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad
from typing import Callable, Type, Dict


def encrypt_data(key, data, msgsecurityparams):
Expand Down Expand Up @@ -73,18 +74,21 @@ def decrypt_data_aes(key, data, msgsecurityparams):
return obj.decrypt(pad(data, 16))


class USM_PRIV_CBC56_DES:
class Priv:
encrypt: Callable


class USM_PRIV_CBC56_DES(Priv):
encrypt = encrypt_data
decrypt = decrypt_data


class USM_PRIV_CFB128_AES:
class USM_PRIV_CFB128_AES(Priv):
encrypt = encrypt_data_aes
decrypt = decrypt_data_aes


PRIV_PROTO = {
'USM_PRIV_CBC56_DES': USM_PRIV_CBC56_DES,
'USM_PRIV_CFB128_AES': USM_PRIV_CFB128_AES,
'USM_PRIV_NONE': None,
}

0 comments on commit ec6f83e

Please sign in to comment.