Skip to content
This repository has been archived by the owner on May 27, 2024. It is now read-only.

Async #50

Merged
merged 8 commits into from
Aug 2, 2023
Merged

Async #50

merged 8 commits into from
Aug 2, 2023

Conversation

nekonekun
Copy link
Contributor

Async client is ready.
Client-related tests were copied and adapted to async.
No already existing files were touched, so it is safe to update.
asyncio.open_connection was introduced in python3.7, so python3.7+ is required to use async version.

@crisidev
Copy link
Collaborator

This is so cool. I'll spend some time reviewing this soon!

tacacs_plus/async_client.py Outdated Show resolved Hide resolved
Comment on lines 92 to 97
async def flow_control(self, reader=None, writer=None):
# we do not need to create new reader/writer instances
# if both are provided
existing = bool(reader and writer)
if existing:
yielded_reader, yielded_writer = reader, writer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the need to pass the reader and writer to all methods. Is it to allow easier testing?

Since we have an object here, can't they become attribute of the object that will be lazily instantiated inside flow_control if they do not exist?

Copy link
Contributor Author

@nekonekun nekonekun Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it to allow easier testing?

Primarily, yes. Secondly, in case we have hundreds of requests, we can use already existing connection and save couple of seconds (moments? stress testing needed) on connection openings/closings.
To be more consistent with sync client, we can remove reader/writer params from AAA methods and create new instances on each request with no change in logic. But this params are last and optional, so not big deal.

can't they become attribute of the object

We cannot move reader/writer to object attributes, as we want to properly close connections/sockets, and we want to avoid race condition. Example:
We have two simultaneous requests -- name them (1) and (2).
(1) Client creates connection and instantiates reader/writer.
(1) Client sends packet.
(2) Client sends packet. No new connection here, as reader/writer already exist.
(1) Client receive response and close connection.
(2) ??? and we have problem here.

Copy link
Collaborator

@crisidev crisidev Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more consistent with sync client, we can remove reader/writer params from AAA methods and create new instances on each request with no change in logic. But this params are last and optional, so not big deal.

I don't think creating new connections every time we run an operation is the right thing, it makes sense to have long lived reader and writer. I am just wondering if we can find a good way to keep the AAA methods with the same type of parameters and handle connections, which are at the end our state, internally.

In other asyncio client I have worked on, a pattern I found pretty neat is to have an async connect() method that creates the reader / writer and an async disconnect() that have to be called to properly close everything when done.

Something along these lines:

class TACACSClient(object):
    def __init__(self, ...):
        ...
        self._lock = asyncio.Lock()
        self._sock = None
        self._reader = None
        self._writer = None

    async def connect(self):
        ...
        async with self._lock:
            self._sock = socket.socket(self.family, socket.SOCK_STREAM)
            self._sock.settimeout(self.timeout)
            self._sock.connect(conn)
            self._reader, self._writer = await asyncio.open_connection(sock=sock)

    async def disconnect(self):
        async with self._lock:
            self._writer.close()
            await self.writer.wait_closed()
            self._sock.close()
            self._reader = None
            self._writer = None
            self._sock = None
    
    async def send(self, body, req_type, seq_no=1):
        if not self.reader or not self.writer:
            raise socket.error('async socket disconnected, the `connect()` method was not awaited')
        ...
        self._writer.write(bytes(packet))
        await self._writer.drain()
        ...  

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some investigation:

We actually need new connection on each request.

If we try to reuse connection:

  • In case of simultaneous requests: we got runtime error read() called while another coroutine is already waiting for incoming data, as multiple reader.read methods are called on same reader.
  • In case of one after another requests: server sends empty responses on all requests after first in same connection, at least my version of TACACS+ server behaves like this.

But it works well, if no reader/writer parameters were passed to AAA methods.

Solution:
Remove reader/writer parameters from AAA methods.
Think about tests.

Please accept my apologies, and i need couple of days to fix this.

Copy link
Collaborator

@crisidev crisidev Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to apologise, this is great! 😄 Thanks for the dive deep.

I think we can probably keep the socket as object attribute and create a new connection per request.

Testing can be achieved with a simple pytest fixture, something like this (not tested)

@pytest.fixture
def patch_connection(monkeypatch):
    async def open_connection(*args, **kwargs):
        return FakeReader(io.BytesIO(response_packets)) FakeWriter(io.BytesIO())
    monkeypatch.setattr(asyncio, 'open_connection', open_connection)

If you pass the fixture to your test, it should patch the open_connection method with the fake reader and writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still couldnt think of possibility to (1) keep socket as attribute and (2) properly connect/close socket.

asyncio.StreamReader and asyncio.StreamWriter are both binded to socket, thus for different requests we need different socket instances.

Previously suggested solution is implemented in fa4527c (thanks for monkeypatch example)

I am not sure about, and i have no chances to test asyncio.open_connection with IPv6 addresses. If it properly works -- we can completely get rid of socket object itself, and pass host-port directly to open_connection.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio.StreamReader and asyncio.StreamWriter are both binded to socket, thus for different requests we need different socket instances.

Are you really sure about this? It doesn't look right that I need a new socket per request, even if concurrency is in place.. Have you tried to just move the socket setup inside __init__()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move socket setup into __init__()

    def __init__(
        self,
        host,
        port,
        secret,
        timeout=10,
        session_id=None,
        family=socket.AF_INET,
        version_max=TAC_PLUS_MAJOR_VER,
        version_min=TAC_PLUS_MINOR_VER,
    ):
        self.host = host
        self.port = port
        self.secret = secret
        self.timeout = timeout
        self.version_max = version_max
        self.version_min = version_min
        self.family = family

        # session_id is an unsigned 32-bit int; unless it's provided, randomize
        self.session_id = session_id or random.randint(1, 2**32 - 1)
        if self.family == socket.AF_INET:
            self.conn = (self.host, self.port)
        else:
            # For AF_INET6 address family, a four-tuple (host, port,
            # flowinfo, scopeid) is used
            self.conn = (self.host, self.port, 0, 0)
        self.sock = socket.socket(self.family, socket.SOCK_STREAM)
        self.sock.settimeout(self.timeout)
        self.sock.connect(self.conn)

Modify decorator:

@contextlib.asynccontextmanager
async def flow_control(self):
    reader, writer = await asyncio.open_connection(sock=self.sock)
    yield reader, writer

Test run on production TACACS+ server

async def main():
    for i in range(2):
        print(await client.authenticate(username, password))

asyncio.run(main())

And it fails on second request

data: b'', data_len: 0, flags: 0, server_msg: b'', server_msg_len: 0, status: PASS
Traceback (most recent call last):
<traceback lines omitted>
ValueError: Unable to extract data from header. Likely the TACACS+ key does not match between server and client

If i place some debug prints i see that client receive empty response.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being this picky.. I am just trying to be sure we are doing the best thing and unfortunately I don't have a testing environment anymore.

I read the documentation of asyncio.open_connection and it seems like the initial idea of keeping the reader and writer as object attributes should work, but you tested and it does not :(

Let's merge the PR as it is, I think it is pretty good and I'll try to do some tests to understand if we can improve performances. Sounds good?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds awesome!

Comment on lines +216 to +236
if authen_type in (TAC_PLUS_AUTHEN_TYPE_PAP,
TAC_PLUS_AUTHEN_TYPE_CHAP):
self.version_min = TAC_PLUS_MINOR_VER_ONE

if authen_type == TAC_PLUS_AUTHEN_TYPE_PAP:
start_data = password.encode()

if authen_type == TAC_PLUS_AUTHEN_TYPE_CHAP:
if not isinstance(chap_ppp_id, str):
raise ValueError('chap_ppp_id must be a string')
if len(chap_ppp_id) != 1:
raise ValueError('chap_ppp_id must be a 1-byte string')
if not isinstance(chap_challenge, str):
raise ValueError('chap_challenge must be a string')
if len(chap_challenge) > 255:
raise ValueError('chap_challenge may not be more 255 bytes')

start_data = chap_ppp_id.encode()
start_data += chap_challenge.encode()
data_to_md5 = (chap_ppp_id + password + chap_challenge).encode()
start_data += md5(data_to_md5).digest()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this is not a blocker, but I am wondering if we can extract this logic since it is basically the same in the sync and async client..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to reuse this code.

Suggestion 1: extract __init__ and version methods from Client/AsyncClient to new BaseClient class, and add staticmethod to deal with authen data packing. I do not like this variant.

Suggestion 2: move code to new function in authentication.py, as it is related to authentication, then patch both sync and async clients.
Function signature be like (password, authen_type, chap_ppp_id, chap_challenge) -> bytes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def make_start_data(password, authen_type, chap_ppp_id, chap_challenge):
    if authen_type == TAC_PLUS_AUTHEN_TYPE_ASCII:
        return six.b('')
    elif authen_type == TAC_PLUS_AUTHEN_TYPE_PAP:
        return six.b(password)
    elif authen_type == TAC_PLUS_AUTHEN_TYPE_CHAP:
        if not isinstance(chap_ppp_id, six.string_types):
            raise ValueError('chap_ppp_id must be a string')
        if len(chap_ppp_id) != 1:
            raise ValueError('chap_ppp_id must be a 1-byte string')
        if not isinstance(chap_challenge, six.string_types):
            raise ValueError('chap_challenge must be a string')
        if len(chap_challenge) > 255:
            raise ValueError('chap_challenge may not be more 255 bytes')
        return (
                six.b(chap_ppp_id) +
                six.b(chap_challenge) +
                md5(six.b(
                    chap_ppp_id + password + chap_challenge
                )).digest()
        )

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a function will do without having to bring in a base class.. Anyway, I would defer this change to some refactoring I would like to do after this PR lands..

Co-authored-by: Matteo Bigoi <1781140+crisidev@users.noreply.github.com>
@crisidev crisidev self-assigned this Jul 31, 2023
Remove reader/writer parameters from AAA methods
Refactor flow_control decorator, as we are no more passing reader/writer instances
Refactor tests
@crisidev
Copy link
Collaborator

crisidev commented Aug 1, 2023

I skimmed through the latest changes on my phone and they look great.

Tomorrow I'll give it a more thorough review and merge!

@crisidev crisidev merged commit e84cb0d into ansible:master Aug 2, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants