-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug: idsse-795: PublishConfirm retry connection #68
Changes from all commits
1120d87
ba42f69
12a0965
8f7d91f
a91f524
2bd7184
61f5568
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,13 @@ | |
import time | ||
from concurrent.futures import Future | ||
from dataclasses import dataclass, field | ||
from random import randint | ||
from itertools import count | ||
from threading import Thread | ||
from typing import NamedTuple, cast | ||
|
||
from pika import SelectConnection, BasicProperties | ||
from pika.channel import Channel | ||
from pika.exceptions import AMQPConnectionError, AMQPChannelError | ||
from pika.frame import Method | ||
from pika.spec import Basic | ||
|
||
|
@@ -64,6 +65,7 @@ class PublishConfirm: | |
be closed, which usually are tied to permission related issues or | ||
socket timeouts. | ||
""" | ||
# pylint: disable=too-many-instance-attributes | ||
|
||
def __init__(self, conn: Conn, exchange: Exch, queue: Queue): | ||
"""Setup the example publisher object, passing in the RabbitMqUtils we will use to | ||
|
@@ -76,9 +78,8 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): | |
a "private queue", i.e. not intended for consumers, and all published messages | ||
will have a 10-second TTL. | ||
""" | ||
self._thread = Thread(name=f'PublishConfirm-{randint(0, 9)}', | ||
daemon=True, | ||
target=self._run) | ||
self._thread: None | Thread = None # not initialized yet | ||
self._next_thread_id = count(0).__next__ # start at 0, iterate each time Thread created | ||
|
||
self._connection: SelectConnection | None = None | ||
self._channel: Channel | None = None | ||
|
@@ -89,6 +90,8 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): | |
self._records = PublishConfirmRecords() # data class to track message activity | ||
self._is_ready_future: Future | None = None | ||
|
||
self._create_thread() | ||
|
||
def publish_message(self, | ||
message: dict, | ||
routing_key='', | ||
|
@@ -110,32 +113,52 @@ def publish_message(self, | |
Raises: | ||
RuntimeError: if channel is uninitialized (start() not completed yet) or is closed | ||
""" | ||
properties = BasicProperties(content_type='application/json', | ||
content_encoding='utf-8', | ||
correlation_id=corr_id) | ||
logger.info('Publishing message to queue %s, message length: %d', | ||
self._rmq_params.queue.name, len(json.dumps(message))) | ||
|
||
is_channel_ready = self._wait_for_channel_to_be_ready() | ||
if not is_channel_ready: | ||
logger.error('RabbitMQ channel not established for some reason. Cannnot publish') | ||
return False | ||
|
||
logger.debug('DEBUG: channel is ready to publish message') | ||
try: | ||
properties = BasicProperties(content_type='application/json', | ||
content_encoding='utf-8', | ||
correlation_id=corr_id) | ||
|
||
logger.info('Publishing message to queue %s, message length: %d', | ||
self._rmq_params.queue.name, len(json.dumps(message))) | ||
self._channel.basic_publish(self._rmq_params.exchange.name, routing_key, | ||
self._channel.basic_publish(self._rmq_params.exchange.name, | ||
routing_key, | ||
json.dumps(message, ensure_ascii=True), | ||
properties) | ||
self._records.message_number += 1 | ||
self._records.deliveries[self._records.message_number] = message | ||
logger.info('Published message # %i to exchange %s, queue %s, routing_key %s', | ||
self._records.message_number, self._rmq_params.exchange.name, | ||
self._rmq_params.queue.name, routing_key) | ||
return True | ||
|
||
except Exception as e: # pylint: disable=broad-exception-caught | ||
logger.error('Publish message problem : (%s) %s', type(e), str(e)) | ||
return False | ||
except (AMQPChannelError, AMQPConnectionError) as exc: | ||
# something wrong with RabbitMQ connection; destroy and recreate the daemon Thread | ||
logger.warning('Publish message problem, restarting thread to re-attempt: (%s) %s', | ||
type(exc), str(exc)) | ||
|
||
# create new Thread, abandoning old one (it will shut itself down) | ||
self._create_thread() | ||
if not self._wait_for_channel_to_be_ready(): | ||
logger.warning('Second attempt to connect to RabbitMQ failed. Cannnot publish') | ||
return False | ||
|
||
try: | ||
self._channel.basic_publish(self._rmq_params.exchange.name, | ||
routing_key, | ||
json.dumps(message, ensure_ascii=True), | ||
properties) | ||
except (AMQPChannelError, AMQPConnectionError) as retry_exc: | ||
logger.error('Second attempt to publish message failed: (%s) %s', | ||
type(retry_exc), str(retry_exc)) | ||
return False | ||
|
||
# publish worked on the first (or second) try | ||
self._records.message_number += 1 | ||
self._records.deliveries[self._records.message_number] = message | ||
logger.info('Published message # %i to exchange %s, queue %s, routing_key %s', | ||
self._records.message_number, self._rmq_params.exchange.name, | ||
self._rmq_params.queue.name, routing_key) | ||
|
||
return True | ||
|
||
def start(self, is_ready: Future | None = None): | ||
""" | ||
|
@@ -151,13 +174,13 @@ def start(self, is_ready: Future | None = None): | |
RuntimeError: if PublishConfirm thread is already running | ||
""" | ||
logger.info('Starting thread with callback %s. is_alive? %s, self._channel: %s', | ||
is_ready, self._thread.is_alive(), self._channel) | ||
is_ready, self._is_running(), self._channel) | ||
|
||
if is_ready is not None: | ||
self._is_ready_future = is_ready # to be invoked after all pika setup is done | ||
|
||
# not possible to start Thread when it's already running | ||
if self._thread.is_alive() or (self._connection is not None and self._connection.is_open): | ||
if self._is_running() or self._is_connected(): | ||
raise RuntimeError('PublishConfirm thread already running, cannot be started') | ||
|
||
self._thread.start() | ||
|
@@ -170,15 +193,23 @@ def stop(self): | |
Starting the IOLoop again will allow the publisher to cleanly | ||
disconnect from RabbitMQ. | ||
""" | ||
logger.info('Stopping') | ||
logger.info('Stopping Thread %s', self._thread and self._thread.name) | ||
self._stopping = True | ||
self._close_connection() | ||
self._stopping = False # done stopping | ||
|
||
def _is_connected(self) -> bool: | ||
"""True if RabbitMQ Connection and Channel exist and are open""" | ||
return (self._connection is not None and self._connection.is_open | ||
and self._channel is not None and self._channel.is_open) | ||
|
||
def _is_running(self) -> bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a good reason to not make this a property? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that I could for this one, since the underlying code ( Can the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure you could make this a property by just adding the annotation, and it would work. However, when I read this code the first time I thought _is_connected() and _is_running() are the same type of functions and thus should have same way of accessing. I still think the access should be the same but would now suggest not making _is_connected() a property, I'm not sure if a private(protected) property is very pythonic. But in the end, I don't think it really matters. |
||
"""True if PublishConfirm Thread exists and is running""" | ||
return self._thread and self._thread.is_alive() | ||
|
||
def _run(self): | ||
"""Run a new thread: get a new RMQ connection, and start looping until stop() is called""" | ||
logger.debug('Creating connection') | ||
self._connection = self._create_connection() | ||
self._connection.ioloop.start() | ||
time.sleep(0.2) | ||
|
@@ -189,6 +220,19 @@ def _run(self): | |
if self._connection is not None and not self._connection.is_closed: | ||
# Finish closing | ||
self._connection.ioloop.start() | ||
logger.info('Thread %s finished stopping', self._thread.name) | ||
|
||
def _create_thread(self): | ||
""" | ||
Create new python Thread for this PublishConfirm, allowing any pre-existing Thread to stop | ||
""" | ||
if self._thread is not None: | ||
self.stop() # halt previously existing Thread to be sure it exits eventually | ||
self._records = PublishConfirmRecords() # reset delivery_tag and such | ||
|
||
self._thread = Thread(name=f'PublishConfirm-{self._next_thread_id()}', | ||
daemon=True, | ||
target=self._run) | ||
|
||
def _create_connection(self): | ||
"""This method connects to RabbitMQ, returning the connection handle. | ||
|
@@ -224,8 +268,7 @@ def _wait_for_channel_to_be_ready(self, timeout: float | None = 6) -> bool: | |
logger.debug(self._channel) | ||
logger.debug('----------------------') | ||
|
||
if (self._connection and self._connection.is_open | ||
and self._channel and self._channel.is_open): | ||
if self._is_connected(): | ||
return True # connection and channel already open, no setup needed | ||
|
||
logger.info('Channel is not ready to publish, calling start() now') | ||
|
@@ -389,11 +432,9 @@ def _on_bindok(self, _unused_frame: Method): | |
self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message | ||
self._channel.confirm_delivery(self._on_delivery_confirmation) | ||
|
||
# notify up that channel can now be published to | ||
logger.debug('RabbitMQ channel ready, passing control back to caller') | ||
if self._is_ready_future: | ||
self._is_ready_future.set_result(True) | ||
|
||
# self.schedule_next_message() | ||
self._is_ready_future.set_result(True) # notify that channel can now be published to | ||
|
||
def _on_delivery_confirmation(self, method_frame: Method): | ||
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think with will cause a problem, but do we need to create a new thread? Isn't it possible that the connection has gone down but the thread is still good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible, but I couldn't figure out how to interrupt the running thread without it exiting/completing.
The way the thread task is written (in the _run() method), it creates a new connection, calls pika's ioloop.start() with it, and sleeps on a loop until something sets self._stopping to True. Once that changes to True, the thread returns.
So I'm not sure how we would replace the
self._connection
out from under the thread, unless we had some clever way for the thread to watch for its connection to change and start ioloop on the new connection instance if it does.