Skip to content

Commit

Permalink
Bug: idsse-795: PublishConfirm retry connection (#68)
Browse files Browse the repository at this point in the history
* add timeouts to PublishConfirm connection
* add retry to publish_message()
* refactor test_publish_confirm to use sanctioned pytest fixtures
* reduce excessively long test names
  • Loading branch information
mackenzie-grimes-noaa committed Jul 22, 2024
1 parent 4b49c52 commit 84c03d6
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 184 deletions.
101 changes: 71 additions & 30 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
import time
from concurrent.futures import Future
from dataclasses import dataclass, field
from random import randint
from itertools import count
from threading import Thread
from typing import NamedTuple, cast

from pika import SelectConnection, BasicProperties
from pika.channel import Channel
from pika.exceptions import AMQPConnectionError, AMQPChannelError
from pika.frame import Method
from pika.spec import Basic

Expand Down Expand Up @@ -64,6 +65,7 @@ class PublishConfirm:
be closed, which usually are tied to permission related issues or
socket timeouts.
"""
# pylint: disable=too-many-instance-attributes

def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
"""Setup the example publisher object, passing in the RabbitMqUtils we will use to
Expand All @@ -76,9 +78,8 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
a "private queue", i.e. not intended for consumers, and all published messages
will have a 10-second TTL.
"""
self._thread = Thread(name=f'PublishConfirm-{randint(0, 9)}',
daemon=True,
target=self._run)
self._thread: None | Thread = None # not initialized yet
self._next_thread_id = count(0).__next__ # start at 0, iterate each time Thread created

self._connection: SelectConnection | None = None
self._channel: Channel | None = None
Expand All @@ -89,6 +90,8 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
self._records = PublishConfirmRecords() # data class to track message activity
self._is_ready_future: Future | None = None

self._create_thread()

def publish_message(self,
message: dict,
routing_key='',
Expand All @@ -110,32 +113,52 @@ def publish_message(self,
Raises:
RuntimeError: if channel is uninitialized (start() not completed yet) or is closed
"""
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8',
correlation_id=corr_id)
logger.info('Publishing message to queue %s, message length: %d',
self._rmq_params.queue.name, len(json.dumps(message)))

is_channel_ready = self._wait_for_channel_to_be_ready()
if not is_channel_ready:
logger.error('RabbitMQ channel not established for some reason. Cannnot publish')
return False

logger.debug('DEBUG: channel is ready to publish message')
try:
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8',
correlation_id=corr_id)

logger.info('Publishing message to queue %s, message length: %d',
self._rmq_params.queue.name, len(json.dumps(message)))
self._channel.basic_publish(self._rmq_params.exchange.name, routing_key,
self._channel.basic_publish(self._rmq_params.exchange.name,
routing_key,
json.dumps(message, ensure_ascii=True),
properties)
self._records.message_number += 1
self._records.deliveries[self._records.message_number] = message
logger.info('Published message # %i to exchange %s, queue %s, routing_key %s',
self._records.message_number, self._rmq_params.exchange.name,
self._rmq_params.queue.name, routing_key)
return True

except Exception as e: # pylint: disable=broad-exception-caught
logger.error('Publish message problem : (%s) %s', type(e), str(e))
return False
except (AMQPChannelError, AMQPConnectionError) as exc:
# something wrong with RabbitMQ connection; destroy and recreate the daemon Thread
logger.warning('Publish message problem, restarting thread to re-attempt: (%s) %s',
type(exc), str(exc))

# create new Thread, abandoning old one (it will shut itself down)
self._create_thread()
if not self._wait_for_channel_to_be_ready():
logger.warning('Second attempt to connect to RabbitMQ failed. Cannnot publish')
return False

try:
self._channel.basic_publish(self._rmq_params.exchange.name,
routing_key,
json.dumps(message, ensure_ascii=True),
properties)
except (AMQPChannelError, AMQPConnectionError) as retry_exc:
logger.error('Second attempt to publish message failed: (%s) %s',
type(retry_exc), str(retry_exc))
return False

# publish worked on the first (or second) try
self._records.message_number += 1
self._records.deliveries[self._records.message_number] = message
logger.info('Published message # %i to exchange %s, queue %s, routing_key %s',
self._records.message_number, self._rmq_params.exchange.name,
self._rmq_params.queue.name, routing_key)

return True

def start(self, is_ready: Future | None = None):
"""
Expand All @@ -151,13 +174,13 @@ def start(self, is_ready: Future | None = None):
RuntimeError: if PublishConfirm thread is already running
"""
logger.info('Starting thread with callback %s. is_alive? %s, self._channel: %s',
is_ready, self._thread.is_alive(), self._channel)
is_ready, self._is_running(), self._channel)

if is_ready is not None:
self._is_ready_future = is_ready # to be invoked after all pika setup is done

# not possible to start Thread when it's already running
if self._thread.is_alive() or (self._connection is not None and self._connection.is_open):
if self._is_running() or self._is_connected():
raise RuntimeError('PublishConfirm thread already running, cannot be started')

self._thread.start()
Expand All @@ -170,15 +193,23 @@ def stop(self):
Starting the IOLoop again will allow the publisher to cleanly
disconnect from RabbitMQ.
"""
logger.info('Stopping')
logger.info('Stopping Thread %s', self._thread and self._thread.name)
self._stopping = True
self._close_connection()
self._stopping = False # done stopping

def _is_connected(self) -> bool:
"""True if RabbitMQ Connection and Channel exist and are open"""
return (self._connection is not None and self._connection.is_open
and self._channel is not None and self._channel.is_open)

def _is_running(self) -> bool:
"""True if PublishConfirm Thread exists and is running"""
return self._thread and self._thread.is_alive()

def _run(self):
"""Run a new thread: get a new RMQ connection, and start looping until stop() is called"""
logger.debug('Creating connection')
self._connection = self._create_connection()
self._connection.ioloop.start()
time.sleep(0.2)
Expand All @@ -189,6 +220,19 @@ def _run(self):
if self._connection is not None and not self._connection.is_closed:
# Finish closing
self._connection.ioloop.start()
logger.info('Thread %s finished stopping', self._thread.name)

def _create_thread(self):
"""
Create new python Thread for this PublishConfirm, allowing any pre-existing Thread to stop
"""
if self._thread is not None:
self.stop() # halt previously existing Thread to be sure it exits eventually
self._records = PublishConfirmRecords() # reset delivery_tag and such

self._thread = Thread(name=f'PublishConfirm-{self._next_thread_id()}',
daemon=True,
target=self._run)

def _create_connection(self):
"""This method connects to RabbitMQ, returning the connection handle.
Expand Down Expand Up @@ -224,8 +268,7 @@ def _wait_for_channel_to_be_ready(self, timeout: float | None = 6) -> bool:
logger.debug(self._channel)
logger.debug('----------------------')

if (self._connection and self._connection.is_open
and self._channel and self._channel.is_open):
if self._is_connected():
return True # connection and channel already open, no setup needed

logger.info('Channel is not ready to publish, calling start() now')
Expand Down Expand Up @@ -389,11 +432,9 @@ def _on_bindok(self, _unused_frame: Method):
self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message
self._channel.confirm_delivery(self._on_delivery_confirmation)

# notify up that channel can now be published to
logger.debug('RabbitMQ channel ready, passing control back to caller')
if self._is_ready_future:
self._is_ready_future.set_result(True)

# self.schedule_next_message()
self._is_ready_future.set_result(True) # notify that channel can now be published to

def _on_delivery_confirmation(self, method_frame: Method):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
Expand Down
8 changes: 3 additions & 5 deletions python/idsse_common/test/test_aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_aws_ls_without_prepend_path(aws_utils: AwsUtils, mock_exec_cmd):
mock_exec_cmd.assert_called_once()


def test_aws_ls_retries_with_s3_command_line(aws_utils: AwsUtils, monkeypatch: MonkeyPatch):
def test_aws_ls_retries_with_s3(aws_utils: AwsUtils, monkeypatch: MonkeyPatch):
# fails first call, succeeds second call
mock_exec_cmd_failure = Mock(
side_effect=[FileNotFoundError, EXAMPLE_FILES])
Expand All @@ -99,7 +99,7 @@ def test_aws_ls_retries_with_s3_command_line(aws_utils: AwsUtils, monkeypatch: M
assert mock_exec_cmd_failure.call_count == 2


def test_aws_ls_returns_empty_array_on_error(aws_utils: AwsUtils, monkeypatch: MonkeyPatch):
def test_aws_ls_on_error(aws_utils: AwsUtils, monkeypatch: MonkeyPatch):
mock_exec_cmd_failure = Mock(side_effect=PermissionError('No permissions'))
monkeypatch.setattr('idsse.common.aws_utils.exec_cmd',
mock_exec_cmd_failure)
Expand Down Expand Up @@ -165,9 +165,7 @@ def test_get_issues_with_same_start_stop(aws_utils: AwsUtils, mock_exec_cmd):
assert result[0] == EXAMPLE_ISSUE


def test_get_issues_latest_issue_from_today_if_no_args_passed(aws_utils: AwsUtils,
mock_exec_cmd):

def test_get_issues_latest_issue_default_today(aws_utils: AwsUtils, mock_exec_cmd):
result = aws_utils.get_issues()
# with current mocks returned issue (latest issue) will always be "now" with
# truncated minute, second, and microsecond
Expand Down
Loading

0 comments on commit 84c03d6

Please sign in to comment.