Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: idsse-795: PublishConfirm retry connection #68

Merged
merged 7 commits into from
Jul 22, 2024
101 changes: 71 additions & 30 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
import time
from concurrent.futures import Future
from dataclasses import dataclass, field
from random import randint
from itertools import count
from threading import Thread
from typing import NamedTuple, cast

from pika import SelectConnection, BasicProperties
from pika.channel import Channel
from pika.exceptions import AMQPConnectionError, AMQPChannelError
from pika.frame import Method
from pika.spec import Basic

Expand Down Expand Up @@ -64,6 +65,7 @@ class PublishConfirm:
be closed, which usually are tied to permission related issues or
socket timeouts.
"""
# pylint: disable=too-many-instance-attributes

def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
"""Setup the example publisher object, passing in the RabbitMqUtils we will use to
Expand All @@ -76,9 +78,8 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
a "private queue", i.e. not intended for consumers, and all published messages
will have a 10-second TTL.
"""
self._thread = Thread(name=f'PublishConfirm-{randint(0, 9)}',
daemon=True,
target=self._run)
self._thread: None | Thread = None # not initialized yet
self._next_thread_id = count(0).__next__ # start at 0, iterate each time Thread created

self._connection: SelectConnection | None = None
self._channel: Channel | None = None
Expand All @@ -89,6 +90,8 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
self._records = PublishConfirmRecords() # data class to track message activity
self._is_ready_future: Future | None = None

self._create_thread()

def publish_message(self,
message: dict,
routing_key='',
Expand All @@ -110,32 +113,52 @@ def publish_message(self,
Raises:
RuntimeError: if channel is uninitialized (start() not completed yet) or is closed
"""
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8',
correlation_id=corr_id)
logger.info('Publishing message to queue %s, message length: %d',
self._rmq_params.queue.name, len(json.dumps(message)))

is_channel_ready = self._wait_for_channel_to_be_ready()
if not is_channel_ready:
logger.error('RabbitMQ channel not established for some reason. Cannnot publish')
return False

logger.debug('DEBUG: channel is ready to publish message')
try:
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8',
correlation_id=corr_id)

logger.info('Publishing message to queue %s, message length: %d',
self._rmq_params.queue.name, len(json.dumps(message)))
self._channel.basic_publish(self._rmq_params.exchange.name, routing_key,
self._channel.basic_publish(self._rmq_params.exchange.name,
routing_key,
json.dumps(message, ensure_ascii=True),
properties)
self._records.message_number += 1
self._records.deliveries[self._records.message_number] = message
logger.info('Published message # %i to exchange %s, queue %s, routing_key %s',
self._records.message_number, self._rmq_params.exchange.name,
self._rmq_params.queue.name, routing_key)
return True

except Exception as e: # pylint: disable=broad-exception-caught
logger.error('Publish message problem : (%s) %s', type(e), str(e))
return False
except (AMQPChannelError, AMQPConnectionError) as exc:
# something wrong with RabbitMQ connection; destroy and recreate the daemon Thread
logger.warning('Publish message problem, restarting thread to re-attempt: (%s) %s',
type(exc), str(exc))

# create new Thread, abandoning old one (it will shut itself down)
self._create_thread()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think with will cause a problem, but do we need to create a new thread? Isn't it possible that the connection has gone down but the thread is still good?

Copy link
Contributor Author

@mackenzie-grimes-noaa mackenzie-grimes-noaa Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but I couldn't figure out how to interrupt the running thread without it exiting/completing.

The way the thread task is written (in the _run() method), it creates a new connection, calls pika's ioloop.start() with it, and sleeps on a loop until something sets self._stopping to True. Once that changes to True, the thread returns.

So I'm not sure how we would replace the self._connection out from under the thread, unless we had some clever way for the thread to watch for its connection to change and start ioloop on the new connection instance if it does.

if not self._wait_for_channel_to_be_ready():
logger.warning('Second attempt to connect to RabbitMQ failed. Cannnot publish')
return False

try:
self._channel.basic_publish(self._rmq_params.exchange.name,
routing_key,
json.dumps(message, ensure_ascii=True),
properties)
except (AMQPChannelError, AMQPConnectionError) as retry_exc:
logger.error('Second attempt to publish message failed: (%s) %s',
type(retry_exc), str(retry_exc))
return False

# publish worked on the first (or second) try
self._records.message_number += 1
self._records.deliveries[self._records.message_number] = message
logger.info('Published message # %i to exchange %s, queue %s, routing_key %s',
self._records.message_number, self._rmq_params.exchange.name,
self._rmq_params.queue.name, routing_key)

return True

def start(self, is_ready: Future | None = None):
"""
Expand All @@ -151,13 +174,13 @@ def start(self, is_ready: Future | None = None):
RuntimeError: if PublishConfirm thread is already running
"""
logger.info('Starting thread with callback %s. is_alive? %s, self._channel: %s',
is_ready, self._thread.is_alive(), self._channel)
is_ready, self._is_running(), self._channel)

if is_ready is not None:
self._is_ready_future = is_ready # to be invoked after all pika setup is done

# not possible to start Thread when it's already running
if self._thread.is_alive() or (self._connection is not None and self._connection.is_open):
if self._is_running() or self._is_connected():
raise RuntimeError('PublishConfirm thread already running, cannot be started')

self._thread.start()
Expand All @@ -170,15 +193,23 @@ def stop(self):
Starting the IOLoop again will allow the publisher to cleanly
disconnect from RabbitMQ.
"""
logger.info('Stopping')
logger.info('Stopping Thread %s', self._thread and self._thread.name)
self._stopping = True
self._close_connection()
self._stopping = False # done stopping

def _is_connected(self) -> bool:
"""True if RabbitMQ Connection and Channel exist and are open"""
return (self._connection is not None and self._connection.is_open
and self._channel is not None and self._channel.is_open)

def _is_running(self) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason to not make this a property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that I could for this one, since the underlying code (Thread.is_alive()) is type Callable[[None], bool] instead of simply bool.

Can the property tag do some magic to invoke a function and get the resulting True/False when you access the property?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure you could make this a property by just adding the annotation, and it would work. However, when I read this code the first time I thought _is_connected() and _is_running() are the same type of functions and thus should have same way of accessing. I still think the access should be the same but would now suggest not making _is_connected() a property, I'm not sure if a private(protected) property is very pythonic. But in the end, I don't think it really matters.

"""True if PublishConfirm Thread exists and is running"""
return self._thread and self._thread.is_alive()

def _run(self):
"""Run a new thread: get a new RMQ connection, and start looping until stop() is called"""
logger.debug('Creating connection')
self._connection = self._create_connection()
self._connection.ioloop.start()
time.sleep(0.2)
Expand All @@ -189,6 +220,19 @@ def _run(self):
if self._connection is not None and not self._connection.is_closed:
# Finish closing
self._connection.ioloop.start()
logger.info('Thread %s finished stopping', self._thread.name)

def _create_thread(self):
"""
Create new python Thread for this PublishConfirm, allowing any pre-existing Thread to stop
"""
if self._thread is not None:
self.stop() # halt previously existing Thread to be sure it exits eventually
self._records = PublishConfirmRecords() # reset delivery_tag and such

self._thread = Thread(name=f'PublishConfirm-{self._next_thread_id()}',
daemon=True,
target=self._run)

def _create_connection(self):
"""This method connects to RabbitMQ, returning the connection handle.
Expand Down Expand Up @@ -224,8 +268,7 @@ def _wait_for_channel_to_be_ready(self, timeout: float | None = 6) -> bool:
logger.debug(self._channel)
logger.debug('----------------------')

if (self._connection and self._connection.is_open
and self._channel and self._channel.is_open):
if self._is_connected():
return True # connection and channel already open, no setup needed

logger.info('Channel is not ready to publish, calling start() now')
Expand Down Expand Up @@ -389,11 +432,9 @@ def _on_bindok(self, _unused_frame: Method):
self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message
self._channel.confirm_delivery(self._on_delivery_confirmation)

# notify up that channel can now be published to
logger.debug('RabbitMQ channel ready, passing control back to caller')
if self._is_ready_future:
self._is_ready_future.set_result(True)

# self.schedule_next_message()
self._is_ready_future.set_result(True) # notify that channel can now be published to

def _on_delivery_confirmation(self, method_frame: Method):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
Expand Down
8 changes: 3 additions & 5 deletions python/idsse_common/test/test_aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_aws_ls_without_prepend_path(aws_utils: AwsUtils, mock_exec_cmd):
mock_exec_cmd.assert_called_once()


def test_aws_ls_retries_with_s3_command_line(aws_utils: AwsUtils, monkeypatch: MonkeyPatch):
def test_aws_ls_retries_with_s3(aws_utils: AwsUtils, monkeypatch: MonkeyPatch):
# fails first call, succeeds second call
mock_exec_cmd_failure = Mock(
side_effect=[FileNotFoundError, EXAMPLE_FILES])
Expand All @@ -99,7 +99,7 @@ def test_aws_ls_retries_with_s3_command_line(aws_utils: AwsUtils, monkeypatch: M
assert mock_exec_cmd_failure.call_count == 2


def test_aws_ls_returns_empty_array_on_error(aws_utils: AwsUtils, monkeypatch: MonkeyPatch):
def test_aws_ls_on_error(aws_utils: AwsUtils, monkeypatch: MonkeyPatch):
mock_exec_cmd_failure = Mock(side_effect=PermissionError('No permissions'))
monkeypatch.setattr('idsse.common.aws_utils.exec_cmd',
mock_exec_cmd_failure)
Expand Down Expand Up @@ -165,9 +165,7 @@ def test_get_issues_with_same_start_stop(aws_utils: AwsUtils, mock_exec_cmd):
assert result[0] == EXAMPLE_ISSUE


def test_get_issues_latest_issue_from_today_if_no_args_passed(aws_utils: AwsUtils,
mock_exec_cmd):

def test_get_issues_latest_issue_default_today(aws_utils: AwsUtils, mock_exec_cmd):
result = aws_utils.get_issues()
# with current mocks returned issue (latest issue) will always be "now" with
# truncated minute, second, and microsecond
Expand Down
Loading
Loading