diff --git a/python/idsse_common/idsse/common/publish_confirm.py b/python/idsse_common/idsse/common/publish_confirm.py index 66881b7..85645f2 100644 --- a/python/idsse_common/idsse/common/publish_confirm.py +++ b/python/idsse_common/idsse/common/publish_confirm.py @@ -19,12 +19,13 @@ import time from concurrent.futures import Future from dataclasses import dataclass, field -from random import randint +from itertools import count from threading import Thread from typing import NamedTuple, cast from pika import SelectConnection, BasicProperties from pika.channel import Channel +from pika.exceptions import AMQPConnectionError, AMQPChannelError from pika.frame import Method from pika.spec import Basic @@ -64,6 +65,7 @@ class PublishConfirm: be closed, which usually are tied to permission related issues or socket timeouts. """ + # pylint: disable=too-many-instance-attributes def __init__(self, conn: Conn, exchange: Exch, queue: Queue): """Setup the example publisher object, passing in the RabbitMqUtils we will use to @@ -76,9 +78,8 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): a "private queue", i.e. not intended for consumers, and all published messages will have a 10-second TTL. """ - self._thread = Thread(name=f'PublishConfirm-{randint(0, 9)}', - daemon=True, - target=self._run) + self._thread: None | Thread = None # not initialized yet + self._next_thread_id = count(0).__next__ # start at 0, iterate each time Thread created self._connection: SelectConnection | None = None self._channel: Channel | None = None @@ -89,6 +90,8 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): self._records = PublishConfirmRecords() # data class to track message activity self._is_ready_future: Future | None = None + self._create_thread() + def publish_message(self, message: dict, routing_key='', @@ -110,6 +113,12 @@ def publish_message(self, Raises: RuntimeError: if channel is uninitialized (start() not completed yet) or is closed """ + properties = BasicProperties(content_type='application/json', + content_encoding='utf-8', + correlation_id=corr_id) + logger.info('Publishing message to queue %s, message length: %d', + self._rmq_params.queue.name, len(json.dumps(message))) + is_channel_ready = self._wait_for_channel_to_be_ready() if not is_channel_ready: logger.error('RabbitMQ channel not established for some reason. Cannnot publish') @@ -117,25 +126,39 @@ def publish_message(self, logger.debug('DEBUG: channel is ready to publish message') try: - properties = BasicProperties(content_type='application/json', - content_encoding='utf-8', - correlation_id=corr_id) - - logger.info('Publishing message to queue %s, message length: %d', - self._rmq_params.queue.name, len(json.dumps(message))) - self._channel.basic_publish(self._rmq_params.exchange.name, routing_key, + self._channel.basic_publish(self._rmq_params.exchange.name, + routing_key, json.dumps(message, ensure_ascii=True), properties) - self._records.message_number += 1 - self._records.deliveries[self._records.message_number] = message - logger.info('Published message # %i to exchange %s, queue %s, routing_key %s', - self._records.message_number, self._rmq_params.exchange.name, - self._rmq_params.queue.name, routing_key) - return True - - except Exception as e: # pylint: disable=broad-exception-caught - logger.error('Publish message problem : (%s) %s', type(e), str(e)) - return False + except (AMQPChannelError, AMQPConnectionError) as exc: + # something wrong with RabbitMQ connection; destroy and recreate the daemon Thread + logger.warning('Publish message problem, restarting thread to re-attempt: (%s) %s', + type(exc), str(exc)) + + # create new Thread, abandoning old one (it will shut itself down) + self._create_thread() + if not self._wait_for_channel_to_be_ready(): + logger.warning('Second attempt to connect to RabbitMQ failed. Cannnot publish') + return False + + try: + self._channel.basic_publish(self._rmq_params.exchange.name, + routing_key, + json.dumps(message, ensure_ascii=True), + properties) + except (AMQPChannelError, AMQPConnectionError) as retry_exc: + logger.error('Second attempt to publish message failed: (%s) %s', + type(retry_exc), str(retry_exc)) + return False + + # publish worked on the first (or second) try + self._records.message_number += 1 + self._records.deliveries[self._records.message_number] = message + logger.info('Published message # %i to exchange %s, queue %s, routing_key %s', + self._records.message_number, self._rmq_params.exchange.name, + self._rmq_params.queue.name, routing_key) + + return True def start(self, is_ready: Future | None = None): """ @@ -151,13 +174,13 @@ def start(self, is_ready: Future | None = None): RuntimeError: if PublishConfirm thread is already running """ logger.info('Starting thread with callback %s. is_alive? %s, self._channel: %s', - is_ready, self._thread.is_alive(), self._channel) + is_ready, self._is_running(), self._channel) if is_ready is not None: self._is_ready_future = is_ready # to be invoked after all pika setup is done # not possible to start Thread when it's already running - if self._thread.is_alive() or (self._connection is not None and self._connection.is_open): + if self._is_running() or self._is_connected(): raise RuntimeError('PublishConfirm thread already running, cannot be started') self._thread.start() @@ -170,15 +193,23 @@ def stop(self): Starting the IOLoop again will allow the publisher to cleanly disconnect from RabbitMQ. """ - logger.info('Stopping') + logger.info('Stopping Thread %s', self._thread and self._thread.name) self._stopping = True self._close_connection() self._stopping = False # done stopping + def _is_connected(self) -> bool: + """True if RabbitMQ Connection and Channel exist and are open""" + return (self._connection is not None and self._connection.is_open + and self._channel is not None and self._channel.is_open) + def _is_running(self) -> bool: + """True if PublishConfirm Thread exists and is running""" + return self._thread and self._thread.is_alive() def _run(self): """Run a new thread: get a new RMQ connection, and start looping until stop() is called""" + logger.debug('Creating connection') self._connection = self._create_connection() self._connection.ioloop.start() time.sleep(0.2) @@ -189,6 +220,19 @@ def _run(self): if self._connection is not None and not self._connection.is_closed: # Finish closing self._connection.ioloop.start() + logger.info('Thread %s finished stopping', self._thread.name) + + def _create_thread(self): + """ + Create new python Thread for this PublishConfirm, allowing any pre-existing Thread to stop + """ + if self._thread is not None: + self.stop() # halt previously existing Thread to be sure it exits eventually + self._records = PublishConfirmRecords() # reset delivery_tag and such + + self._thread = Thread(name=f'PublishConfirm-{self._next_thread_id()}', + daemon=True, + target=self._run) def _create_connection(self): """This method connects to RabbitMQ, returning the connection handle. @@ -224,8 +268,7 @@ def _wait_for_channel_to_be_ready(self, timeout: float | None = 6) -> bool: logger.debug(self._channel) logger.debug('----------------------') - if (self._connection and self._connection.is_open - and self._channel and self._channel.is_open): + if self._is_connected(): return True # connection and channel already open, no setup needed logger.info('Channel is not ready to publish, calling start() now') @@ -389,11 +432,9 @@ def _on_bindok(self, _unused_frame: Method): self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message self._channel.confirm_delivery(self._on_delivery_confirmation) - # notify up that channel can now be published to + logger.debug('RabbitMQ channel ready, passing control back to caller') if self._is_ready_future: - self._is_ready_future.set_result(True) - - # self.schedule_next_message() + self._is_ready_future.set_result(True) # notify that channel can now be published to def _on_delivery_confirmation(self, method_frame: Method): """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC diff --git a/python/idsse_common/test/test_aws_utils.py b/python/idsse_common/test/test_aws_utils.py index caf8582..36564a3 100644 --- a/python/idsse_common/test/test_aws_utils.py +++ b/python/idsse_common/test/test_aws_utils.py @@ -87,7 +87,7 @@ def test_aws_ls_without_prepend_path(aws_utils: AwsUtils, mock_exec_cmd): mock_exec_cmd.assert_called_once() -def test_aws_ls_retries_with_s3_command_line(aws_utils: AwsUtils, monkeypatch: MonkeyPatch): +def test_aws_ls_retries_with_s3(aws_utils: AwsUtils, monkeypatch: MonkeyPatch): # fails first call, succeeds second call mock_exec_cmd_failure = Mock( side_effect=[FileNotFoundError, EXAMPLE_FILES]) @@ -99,7 +99,7 @@ def test_aws_ls_retries_with_s3_command_line(aws_utils: AwsUtils, monkeypatch: M assert mock_exec_cmd_failure.call_count == 2 -def test_aws_ls_returns_empty_array_on_error(aws_utils: AwsUtils, monkeypatch: MonkeyPatch): +def test_aws_ls_on_error(aws_utils: AwsUtils, monkeypatch: MonkeyPatch): mock_exec_cmd_failure = Mock(side_effect=PermissionError('No permissions')) monkeypatch.setattr('idsse.common.aws_utils.exec_cmd', mock_exec_cmd_failure) @@ -165,9 +165,7 @@ def test_get_issues_with_same_start_stop(aws_utils: AwsUtils, mock_exec_cmd): assert result[0] == EXAMPLE_ISSUE -def test_get_issues_latest_issue_from_today_if_no_args_passed(aws_utils: AwsUtils, - mock_exec_cmd): - +def test_get_issues_latest_issue_default_today(aws_utils: AwsUtils, mock_exec_cmd): result = aws_utils.get_issues() # with current mocks returned issue (latest issue) will always be "now" with # truncated minute, second, and microsecond diff --git a/python/idsse_common/test/test_publish_confirm.py b/python/idsse_common/test/test_publish_confirm.py index 9ac892f..02c00f6 100644 --- a/python/idsse_common/test/test_publish_confirm.py +++ b/python/idsse_common/test/test_publish_confirm.py @@ -12,16 +12,17 @@ # pylint: disable=missing-function-docstring,redefined-outer-name,invalid-name,protected-access # pylint: disable=too-few-public-methods,unused-argument -from time import sleep from collections.abc import Callable from concurrent.futures import Future -from copy import deepcopy -from typing import Any, NamedTuple, Self -from unittest.mock import Mock +from time import sleep +from typing import NamedTuple +from unittest.mock import Mock, PropertyMock from pytest import fixture, raises, MonkeyPatch -from pika.spec import Basic +from pika import SelectConnection +from pika.exceptions import AMQPConnectionError +from pika.spec import Basic, Channel from idsse.common.publish_confirm import PublishConfirm from idsse.common.rabbitmq_utils import Conn, Exch, Queue @@ -35,119 +36,112 @@ class Method(NamedTuple): method: Basic.Ack | Basic.Nack -class MockPika: +# fixtures +@fixture +def channel_state() -> dict: + """ + Track the simulated state of our mock Channel, so it can be initialized with + default values, read/written anywhere in a given test, and reset after each test finishes """ - Mock classes to imitate pika functionality, callbacks, etc. + return { + 'is_open': False, + 'delivery_tag': 0, # track messages that have been mock "sent" over our channel + } + + +@fixture +def mock_channel(channel_state: dict) -> Mock: + # set up complex pytest Mock object to emulate Channel + mock_obj = Mock(name='MockChannel', spec=Channel) + channel = mock_obj.return_value + channel.__int__= Mock(return_value=0) + + channel.is_open = PropertyMock(side_effect=lambda: channel_state['is_open']) + channel.is_closed = PropertyMock(side_effect=lambda: not channel_state['is_open']) + channel.exchange_declare.side_effect = ( + lambda exchange, exchange_type, callback: callback('userdata') + ) + channel.queue_declare.side_effect = ( + lambda queue, durable, arguments, exclusive, auto_delete, callback: callback(None) + ) + channel.queue_bind.side_effect = ( + lambda queue, exchange, routing_key, callback: callback(None) + ) + + def mock_confirm_delivery(callback): + method = Method(Basic.Ack(channel_state['delivery_tag'])) + channel_state['delivery_tag'] += 1 # our Mock needs to track this message ID as "sent" + callback(method) # send new Ack message back to PublishConfirm + channel.confirm_delivery.side_effect = mock_confirm_delivery - Note that classes here are reduced functionality by far; only properties/methods/interfaces - that exist here are the ones used by PublishConfirm (at the time tests were written) + def mock_basic_publish(exchange, key, body, properties): + channel_state['delivery_tag'] += 1 + channel.basic_publish.side_effect = mock_basic_publish + + def mock_close(): + channel_state['is_open'] = False + channel.close.side_effect = mock_close + + def mock_init(on_open_callback): + channel_state['is_open'] = True + channel_state['delivery_tag'] = 0 + on_open_callback(channel) + mock_obj.side_effect = mock_init + + return mock_obj + + +@fixture +def conn_state() -> dict: """ - def __init__(self): - self.delivery_tag = 0 # pseudo-global to track messages we have "sent" to our mock server - - class Channel: - """mock of pika.channel.Channel""" - def __init__(self): - self._context = MockPika() - self.channel_number = 0 - self.is_open = True - self.is_closed = False - - def __int__(self): - """Return int representation of channel""" - return self.channel_number - - def add_on_close_callback(self, callback): - pass - - def exchange_declare(self, exchange, exchange_type, callback: Callable[[Any, str], None]): - callback('userdata') - - # pylint: disable=too-many-arguments - def queue_declare( - self, queue, durable, arguments, exclusive, auto_delete, callback: Callable[[Any], None] - ): - callback(None) - - def queue_bind(self, queue, exchange, routing_key, callback: Callable[[Any], None]): - callback(None) # connection expected, but PublishConfirm doesn't actually use it - - def confirm_delivery(self, callback: Callable[[Method], None]): - """ - Args: - callback (Callable[[Method], None]) - """ - # may need to make this mockable in the future to pass Nack or customize delivery_tag - method = Method(Basic.Ack(delivery_tag=self._context.delivery_tag)) - self._context.delivery_tag += 1 # MockPika needs to track this message ID as "sent" - - callback(method) # send new Ack message back to PublishConfirm - - def basic_publish(self, exchange: str, key: str, body: str, properties): - self._context.delivery_tag += 1 - - def close(self): - self.is_open = False - self.is_closed = True - - - class IOLoop: - """mock of pika.SelectConnection.ioloop""" - def __init__( - self, - connection, - on_open: Callable[[Any], Any], - on_close: Callable[[Any, str], Any] - ): - self._connection = connection - self.on_open = on_open - self.on_close = on_close - - def start(self): - self.on_open(self._connection) - - def stop(self): - self.on_close(self._connection, 'some_reason') - - def call_later(self): - pass - - - class SelectConnection: - """mock of pika.SelectConnection""" - def __init__(self, - parameters, - on_open_callback: Callable[[Any], Self], - on_open_error_callback, - on_close_callback: Callable[[Any, str], Self]): - self.is_open = True - self.is_closed = False - self._context = MockPika() - - self.ioloop = self._context.IOLoop(self, on_open_callback, on_close_callback) - - def channel(self, on_open_callback: Callable[[Any], None]): - """ - Args: - on_open_callback (Callable[[MockPika.Channel], None]) - """ - on_open_callback(self._context.Channel()) - - def close(self): - self.is_open = False - self.is_closed = True + Track the simulated state of our mock Connection, so it can be initialized with + default values, read/written anywhere in a given test, and reset after each test finishes + """ + return { + 'is_open': False, + # save Connection's open/close callbacks when Connection is initialized, to be invoked + # when PublishConfirm.start/stop is called. These defaults should never run; + # should be overwritten with real callbacks inside PublishConfirm._create_connection() + 'on_open': lambda: RuntimeError('on_open_callback not passed to SelectConnection()'), + 'on_close': lambda: RuntimeError('on_open_callback not passed to SelectConnection()'), + } -# fixtures @fixture -def context() -> MockPika: - """Create an instance of our mocked pika library. delivery_tag initialized to 0""" - return MockPika() +def mock_connection( + monkeypatch: MonkeyPatch, + mock_channel: Mock, + conn_state: dict, + channel_state: dict +) -> Mock: + mock_obj = Mock(name="MockConnection", spec=SelectConnection) + + connection = mock_obj.return_value + connection.is_open = PropertyMock(side_effect=lambda: conn_state['is_open']) + connection.channel = mock_channel + + # pylint: disable=unnecessary-lambda + connection.ioloop.start.side_effect = lambda: conn_state['on_open']() + connection.ioloop.stop.side_effect = lambda: None + + def mock_close(): + conn_state['is_open'] = False + connection.close.side_effect = mock_close + + def mock_init(parameters, on_open_callback, on_open_error_callback, on_close_callback): + conn_state['is_open'] = True + conn_state['on_open'] = lambda: on_open_callback(connection) + conn_state['on_close'] = lambda: on_close_callback(connection, 'Closed by test') + return connection + + mock_obj.side_effect = mock_init + + monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', mock_obj) + return mock_obj @fixture -def publish_confirm(monkeypatch: MonkeyPatch, context: MockPika) -> PublishConfirm: - monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) +def publish_confirm(mock_connection: Mock) -> PublishConfirm: return PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) @@ -161,25 +155,27 @@ def test_publish_confirm_start_and_stop(publish_confirm: PublishConfirm): publish_confirm.stop() - assert publish_confirm._connection.is_closed - assert publish_confirm._channel.is_closed + assert publish_confirm._connection is None or publish_confirm._connection.is_closed + assert publish_confirm._channel is None or publish_confirm._channel.is_closed -def test_delivery_confirmation_handles_nack(publish_confirm: PublishConfirm, context: MockPika): - def mock_confirm_delivery(self: context.Channel, callback: Callable[[Method], None]): - method = Method(Basic.Nack(delivery_tag=context.delivery_tag)) - self._context.delivery_tag += 1 +def test_delivery_confirmation_handles_nack( + publish_confirm: PublishConfirm, mock_connection: Mock, channel_state: dict +): + def mock_confirm_delivery(callback: Callable[[Method], None]): + method = Method(Basic.Nack(channel_state['delivery_tag'])) + channel_state['delivery_tag'] += 1 callback(method) publish_confirm._records.deliveries[0] = 'Confirm.Select' - context.Channel.confirm_delivery = mock_confirm_delivery + mock_connection.return_value.channel.return_value.confirm_delivery = mock_confirm_delivery publish_confirm.start() assert publish_confirm._records.nacked == 1 assert publish_confirm._records.acked == 0 -def test_wait_for_channel_to_be_ready_timeout(publish_confirm: PublishConfirm, context: MockPika): +def test_wait_for_channel_to_be_ready_timeout(publish_confirm: PublishConfirm): # start() doesn't call its callback in time (at all), so timeout should expire publish_confirm.start = Mock(side_effect=lambda is_ready: None) @@ -192,8 +188,7 @@ def test_wait_for_channel_to_be_ready_timeout(publish_confirm: PublishConfirm, c publish_confirm.start = PublishConfirm.start -def test_publish_message_success_without_calling_start(monkeypatch: MonkeyPatch, context: MockPika): - monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) +def test_publish_message_success_without_calling_start(mock_connection: Mock): pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) example_message = {'data': [123]} @@ -207,12 +202,12 @@ def test_publish_message_success_without_calling_start(monkeypatch: MonkeyPatch, assert pub_conf._records.deliveries[1] == example_message -def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, context: MockPika): +def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, mock_connection: Mock): message_data = {'data': 123} + mock_connection.return_value.channel.return_value.basic_publish = Mock( + side_effect=AMQPConnectionError('ACCESS_REFUSED') + ) - publish_confirm._connection = context.SelectConnection(None, Mock(), Mock(), Mock()) - publish_confirm._channel = context.Channel() - publish_confirm._channel.basic_publish = Mock(side_effect=RuntimeError('ACCESS_REFUSED')) success = publish_confirm.publish_message(message_data) # publish should have returned failure and not recorded a message delivery @@ -222,17 +217,36 @@ def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, cont # teardown our ad-hoc mocking of PublishConfirm instance publish_confirm.start = PublishConfirm.start + publish_confirm.stop() + +def test_publish_failure_restarts_thread(publish_confirm: PublishConfirm, mock_connection: Mock): + message_data = {'data': 123} + + # fail the first publish, succeed without incident on the second + mock_connection.return_value.channel.return_value.basic_publish = Mock( + side_effect=[AMQPConnectionError('ACCESS_REFUSED'), None] + ) + + initial_thread_name = publish_confirm._thread.name + success = publish_confirm.publish_message(message_data) + assert success + assert mock_connection.return_value.channel.return_value.basic_publish.call_count == 2 + assert publish_confirm._thread.name != initial_thread_name # should have new Thread -def test_on_channel_closed(publish_confirm: PublishConfirm, context: MockPika): - publish_confirm._connection = context.SelectConnection(None, Mock(), Mock(), Mock()) - publish_confirm._channel = context.Channel() - publish_confirm._channel.close() - publish_confirm._on_channel_closed(context.Channel(), 'ChannelClosedByClient') +def test_on_channel_closed(publish_confirm: PublishConfirm, mock_connection: Mock): + publish_confirm.start() + assert publish_confirm._channel.is_open + + channel = mock_connection.return_value.channel.return_value + publish_confirm._on_channel_closed(channel, 'ChannelClosedByClient') + assert publish_confirm._channel is None assert publish_confirm._connection.is_closed + publish_confirm.stop() # teardown + def test_start_with_future(publish_confirm: PublishConfirm): is_channel_ready = Future() @@ -240,23 +254,19 @@ def test_start_with_future(publish_confirm: PublishConfirm): # run test publish_confirm.start(is_channel_ready) - assert is_channel_ready.result(timeout=5) + assert is_channel_ready.result(timeout=2) # teardown publish_confirm.stop() -def test_start_future_raises_exception(monkeypatch: MonkeyPatch, context: MockPika): - # set up mock to fail RabbitMQ exchange declare step - monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) - pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) - - original_channel_class = context.Channel - mock_channel = deepcopy(context.Channel) - mock_channel.exchange_declare = Mock( +def test_exchange_failure_raises_exception(monkeypatch: MonkeyPatch, mock_connection: Mock): + # set up mock Channel that will fail on RabbitMQ exchange declare step + mock_connection.return_value.channel.return_value.exchange_declare = Mock( side_effect=ValueError('Precondition failed: exchange did not match') ) - context.Channel = mock_channel + + pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) # run test is_channel_ready = Future() @@ -264,9 +274,6 @@ def test_start_future_raises_exception(monkeypatch: MonkeyPatch, context: MockPi exc = is_channel_ready.exception() assert isinstance(exc, ValueError) and 'Precondition failed' in str(exc.args[0]) - # teardown hacky test mock - context.Channel = original_channel_class - def test_start_without_callback_sleeps(publish_confirm: PublishConfirm, monkeypatch: MonkeyPatch): def mock_sleep_function(secs: float): @@ -287,17 +294,16 @@ def mock_sleep_function(secs: float): assert set(sleep_call_args) in [set([(0.2,)]), set([(0.2,), (0.1,)])] -def test_wait_for_channel_returns_when_ready(monkeypatch: MonkeyPatch, context: MockPika): - monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) - pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) +def test_wait_for_channel_returns_when_ready(publish_confirm: PublishConfirm): + publish_confirm._connection = None + publish_confirm._channel = None - assert pub_conf._channel is None - pub_conf._wait_for_channel_to_be_ready() - assert pub_conf._channel is not None and pub_conf._channel.is_open + is_ready = publish_confirm._wait_for_channel_to_be_ready() + assert is_ready + assert publish_confirm._channel is not None and publish_confirm._channel.is_open -def test_calling_start_twice_raises_error(monkeypatch: MonkeyPatch, context: MockPika): - monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) +def test_calling_start_twice_raises_error(mock_connection: Mock): pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) pub_conf.start()