diff --git a/README.md b/README.md index 616c195..bd42476 100644 --- a/README.md +++ b/README.md @@ -259,7 +259,7 @@ Example: await producer.reconnect_stream(stream) ``` -Please take a look at the complete examples [here](https://github.com/qweeze/rstream/blob/master/docs/examples/check_connection_broken/) +Please take a look at the complete reliable client example [here](https://github.com/qweeze/rstream/blob/master/docs/examples/reliable_client/) ### Metadata Update @@ -268,7 +268,7 @@ linked to the stream and then it sends the Metadata update event. the behaviour is similar to what we have for disconnections. In case of the Producer/Superstream Producer the Client will try to automatically reconnect while the Consumer needs to manage the on_close_handler event. -Please take a look at the complete examples [here](https://github.com/qweeze/rstream/blob/master/docs/examples/metadata_update/) +Please take a look at the complete reliable client example [here](https://github.com/qweeze/rstream/blob/master/docs/examples/reliable_client/) ## Load Balancer diff --git a/docs/examples/check_connection_broken/README.md b/docs/examples/check_connection_broken/README.md deleted file mode 100644 index 0f0e70d..0000000 --- a/docs/examples/check_connection_broken/README.md +++ /dev/null @@ -1,20 +0,0 @@ -Connection broken ---- - -Currently the client supports auto-reconnect just for Producer and SuperstreamProducer. - -The client does not support auto-reconnect at consumer side at the moment, but it allows you to be notified if such -event happens and take action through a callback. - -This callback can be passed to the constructor of (superstream)consumers during instantiation. - -You can use these callbacks in order to notify your main flow that a disconnection happened to properly close -consumers and producers or you can use the method reconnect_stream in order to try to reconnect to the stream. - -You can find the examples in this folder for the close and reconnect scenarios for consumers. - -# Specify an offset to restart - -In case of a consumer or super_stream_consumer you can specify in the reconnect_stream method the offset you want to -restart consuming. If you don't specify it by default it reconnects from the last consumed message. - diff --git a/docs/examples/check_connection_broken/consumer_handle_connections_issues_with_close.py b/docs/examples/check_connection_broken/consumer_handle_connections_issues_with_close.py deleted file mode 100644 index 46f556d..0000000 --- a/docs/examples/check_connection_broken/consumer_handle_connections_issues_with_close.py +++ /dev/null @@ -1,59 +0,0 @@ -import asyncio -import signal - -from rstream import ( - AMQPMessage, - Consumer, - MessageContext, - OnClosedErrorInfo, - amqp_decoder, -) - -STREAM = "my-test-stream" -COUNT = 0 -connection_is_closed = False - - -async def consume(): - async def on_connection_closed(disconnection_info: OnClosedErrorInfo) -> None: - print( - "connection has been closed from stream: " - + str(disconnection_info.streams) - + " for reason: " - + str(disconnection_info.reason) - ) - - global connection_is_closed - - # avoid multiple simultaneous disconnection to call close multiple times - if connection_is_closed is False: - await consumer.close() - connection_is_closed = True - - consumer = Consumer( - host="localhost", - port=5552, - vhost="/", - username="guest", - password="guest", - on_close_handler=on_connection_closed, - ) - - loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close())) - - async def on_message(msg: AMQPMessage, message_context: MessageContext): - stream = message_context.consumer.get_stream(message_context.subscriber_name) - offset = message_context.offset - global COUNT - COUNT = COUNT + 1 - if COUNT % 1000000 == 0: - # print("Got message: {} from stream {}, offset {}".format(msg, stream, offset)) - print("consumed 1 million messages") - - await consumer.start() - await consumer.subscribe(stream=STREAM, callback=on_message, decoder=amqp_decoder) - await consumer.run() - - -asyncio.run(consume()) diff --git a/docs/examples/check_connection_broken/consumer_handle_connections_issues_with_reconnection.py b/docs/examples/check_connection_broken/consumer_handle_connections_issues_with_reconnection.py deleted file mode 100644 index 53e5ed8..0000000 --- a/docs/examples/check_connection_broken/consumer_handle_connections_issues_with_reconnection.py +++ /dev/null @@ -1,61 +0,0 @@ -import asyncio -import signal - -from rstream import ( - AMQPMessage, - Consumer, - MessageContext, - OnClosedErrorInfo, - amqp_decoder, -) - -STREAM = "my-test-stream" -COUNT = 0 -connection_is_closed = False - - -async def consume(): - async def on_connection_closed(disconnection_info: OnClosedErrorInfo) -> None: - print( - "connection has been closed from stream: " - + str(disconnection_info.streams) - + " for reason: " - + str(disconnection_info.reason) - ) - - global connection_is_closed - - for stream in disconnection_info.streams: - # restart from last offset in subscriber - # alternatively you can specify an offset to reconnect - await consumer.reconnect_stream(stream) - - consumer = Consumer( - host="localhost", - port=5552, - vhost="/", - username="guest", - password="guest", - on_close_handler=on_connection_closed, - ) - - loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close())) - - async def on_message(msg: AMQPMessage, message_context: MessageContext): - stream = message_context.consumer.get_stream(message_context.subscriber_name) - offset = message_context.offset - global COUNT - COUNT = COUNT + 1 - if COUNT % 1000000 == 0: - # print("Got message: {} from stream {}, offset {}".format(msg, stream, offset)) - print("consumed 1 million messages") - - await consumer.start() - await consumer.subscribe( - stream=STREAM, callback=on_message, decoder=amqp_decoder, subscriber_name="subscriber_1" - ) - await consumer.run() - - -asyncio.run(consume()) diff --git a/docs/examples/check_connection_broken/superstream_consumer_handle_connections_issues.py b/docs/examples/check_connection_broken/superstream_consumer_handle_connections_issues.py deleted file mode 100644 index 98abb8c..0000000 --- a/docs/examples/check_connection_broken/superstream_consumer_handle_connections_issues.py +++ /dev/null @@ -1,62 +0,0 @@ -import asyncio -import signal - -from rstream import ( - AMQPMessage, - ConsumerOffsetSpecification, - MessageContext, - OffsetType, - OnClosedErrorInfo, - SuperStreamConsumer, - amqp_decoder, -) - -count = 0 -connection_is_closed = False - - -async def on_message(msg: AMQPMessage, message_context: MessageContext): - global count - count += 1 - if (count % 100000) == 0: - stream = await message_context.consumer.stream(message_context.subscriber_name) - offset = message_context.offset - print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset)) - - -async def consume(): - async def on_connection_closed(disconnection_info: OnClosedErrorInfo) -> None: - print( - "connection has been closed from stream: " - + str(disconnection_info.streams) - + " for reason: " - + disconnection_info.reason - ) - - global connection_is_closed - if connection_is_closed is False: - connection_is_closed = True - # avoid multiple simultaneous disconnection to call close multiple times - await consumer.close() - - consumer = SuperStreamConsumer( - host="localhost", - port=5552, - vhost="/", - username="guest", - password="guest", - super_stream="invoices", - on_close_handler=on_connection_closed, - ) - - loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close())) - offset_specification = ConsumerOffsetSpecification(OffsetType.FIRST, None) - await consumer.start() - await consumer.subscribe( - callback=on_message, decoder=amqp_decoder, offset_specification=offset_specification - ) - await consumer.run() - - -asyncio.run(consume()) diff --git a/docs/examples/check_connection_broken/superstream_consumer_handle_reconnection.py b/docs/examples/check_connection_broken/superstream_consumer_handle_reconnection.py deleted file mode 100644 index b918038..0000000 --- a/docs/examples/check_connection_broken/superstream_consumer_handle_reconnection.py +++ /dev/null @@ -1,61 +0,0 @@ -import asyncio -import signal - -from rstream import ( - AMQPMessage, - ConsumerOffsetSpecification, - MessageContext, - OffsetType, - OnClosedErrorInfo, - SuperStreamConsumer, - amqp_decoder, -) - -count = 0 -connection_is_closed = False - - -async def on_message(msg: AMQPMessage, message_context: MessageContext): - global count - count += 1 - if (count % 100000) == 0: - stream = await message_context.consumer.stream(message_context.subscriber_name) - offset = message_context.offset - print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset)) - - -async def consume(): - async def on_connection_closed(disconnection_info: OnClosedErrorInfo) -> None: - print( - "connection has been closed from stream: " - + str(disconnection_info.streams) - + " for reason: " - + disconnection_info.reason - ) - - for stream in disconnection_info.streams: - # restart from last offset in subscriber - # alternatively you can specify an offset to reconnect - await consumer.reconnect_stream(stream) - - consumer = SuperStreamConsumer( - host="localhost", - port=5552, - vhost="/", - username="guest", - password="guest", - super_stream="invoices", - on_close_handler=on_connection_closed, - ) - - loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close())) - offset_specification = ConsumerOffsetSpecification(OffsetType.FIRST, None) - await consumer.start() - await consumer.subscribe( - callback=on_message, decoder=amqp_decoder, offset_specification=offset_specification - ) - await consumer.run() - - -asyncio.run(consume()) diff --git a/docs/examples/metadata_update/README.md b/docs/examples/metadata_update/README.md deleted file mode 100644 index 8972ca9..0000000 --- a/docs/examples/metadata_update/README.md +++ /dev/null @@ -1,22 +0,0 @@ -Simple scenarios for Metadata update management ---- - -If the streams topology changes (ex:Stream deleted or add/remove follower), the client receives a MetadataUpdate event. - -The server removes the producers and consumers linked to the stream before sending the Metadata update event. - -Similarly to the disconnection scenario the Producer tries to automatically reconnect while the Consumer gets notified by a callback - -After this the client can try to reconnect. - -Here you can find examples for producers and consumers. - -You can start the producer and the consumer and then force the server to change the topology of the stream for example with this command: - -rabbitmq-streams delete_replica my-test-stream rabbit@rabbitmqcluster-sample-server-0.rabbitmqcluster-sample-nodes.rabbitmq-system - -During the reconnection or the check you could receive the `streamNotAvaiable` error. -`streamNotAvaiable` is a temporary problem. It means that the stream is not ready yet. - -Producer/Superstream-producer side there is nothing really much to do (the Producer tries to reconnect automatically). -Consumer side instead you need to define a callback to catch the event and decide what to do. \ No newline at end of file diff --git a/docs/examples/metadata_update/super_stream_consumer_metadata_update.py b/docs/examples/metadata_update/super_stream_consumer_metadata_update.py deleted file mode 100644 index 62eefb6..0000000 --- a/docs/examples/metadata_update/super_stream_consumer_metadata_update.py +++ /dev/null @@ -1,77 +0,0 @@ -import asyncio -import signal - -from rstream import ( - AMQPMessage, - ConsumerOffsetSpecification, - MessageContext, - OffsetType, - OnClosedErrorInfo, - SuperStreamConsumer, - amqp_decoder, -) - -count = 0 -connection_is_closed = False - - -async def on_message(msg: AMQPMessage, message_context: MessageContext): - global count - count += 1 - if (count % 100000) == 0: - stream = await message_context.consumer.stream(message_context.subscriber_name) - offset = message_context.offset - print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset)) - - -async def consume(): - async def on_metadata_update(on_closed_info: OnClosedErrorInfo) -> None: - - print( - "connection has been closed from stream: " - + str(on_closed_info.streams) - + " for reason: " - + str(on_closed_info.reason) - ) - - await asyncio.sleep(2) - # reconnect just if the partition exists - for stream in on_closed_info.streams: - backoff = 1 - while True: - try: - print("reconnecting stream: {}".format(stream)) - await consumer.reconnect_stream(stream) - break - except Exception as ex: - if backoff > 32: - # failed to found the leader - print("reconnection failed") - break - backoff = backoff * 2 - print("exception reconnecting waiting 120s: " + str(ex)) - await asyncio.sleep(30) - continue - - consumer = SuperStreamConsumer( - host="test", - port=5552, - vhost="/", - username="guest", - password="guest", - super_stream="invoices", - on_close_handler=on_metadata_update, - load_balancer_mode=True, - ) - - loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close())) - offset_specification = ConsumerOffsetSpecification(OffsetType.FIRST, None) - await consumer.start() - await consumer.subscribe( - callback=on_message, decoder=amqp_decoder, offset_specification=offset_specification - ) - await consumer.run() - - -asyncio.run(consume()) diff --git a/docs/examples/metadata_update/super_stream_producer_metadata_update.py b/docs/examples/metadata_update/super_stream_producer_metadata_update.py deleted file mode 100644 index d2119a4..0000000 --- a/docs/examples/metadata_update/super_stream_producer_metadata_update.py +++ /dev/null @@ -1,64 +0,0 @@ -import asyncio -import time - -from rstream import ( - AMQPMessage, - OnClosedErrorInfo, - RouteType, - StreamDoesNotExist, - SuperStreamProducer, -) - -SUPER_STREAM = "invoices" -MESSAGES = 100000000 -producer_closed = False - -# this value will be hashed using mumh3 hashing algorithm to decide the partition resolution for the message -async def routing_extractor(message: AMQPMessage) -> str: - return message.application_properties["id"] - - -async def publish(): - - # SuperStreamProducer wraps a Producer - async with SuperStreamProducer( - "localhost", - username="test", - password="test", - routing_extractor=routing_extractor, - routing=RouteType.Hash, - super_stream=SUPER_STREAM, - load_balancer_mode=True, - ) as super_stream_producer: - # Sending a million messages - - start_time = time.perf_counter() - for i in range(MESSAGES): - amqp_message = AMQPMessage( - body="hello: {}".format(i), - application_properties={"id": "{}".format(i)}, - ) - global producer_closed - - if producer_closed is False: - try: - await super_stream_producer.send(amqp_message) - except Exception as e: - # give some time to the reconnect_stream to reconnect - print("error sending message: {}".format(e)) - producer_closed = False - continue - else: - producer_closed = False - continue - if i % 100000 == 0: - print(f"Published {i} messages to super stream: {SUPER_STREAM}") - await asyncio.sleep(2) - - end_time = time.perf_counter() - print( - f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds to super stream: {SUPER_STREAM}" - ) - - -asyncio.run(publish()) diff --git a/docs/examples/reliable_client/BestPracticesClient.py b/docs/examples/reliable_client/BestPracticesClient.py new file mode 100644 index 0000000..7e706d6 --- /dev/null +++ b/docs/examples/reliable_client/BestPracticesClient.py @@ -0,0 +1,299 @@ +import asyncio +import json +import signal +import time + +# Set of import from rstream needed for the various functionalities +from rstream import ( + AMQPMessage, + ConfirmationStatus, + Consumer, + ConsumerOffsetSpecification, + MessageContext, + OffsetType, + OnClosedErrorInfo, + Producer, + RouteType, + SuperStreamConsumer, + SuperStreamProducer, + amqp_decoder, +) + +# global variables needed by the test +confirmed_count = 0 +messages_consumed = 0 +messages_per_producer = 0 +producer: Producer +consumer: Consumer + +# Load configuration file (appsettings.json) +async def load_json_file(configuration_file: str) -> dict: + data = open("./python_rstream/appsettings.json") + return json.load(data) + + +async def print_test_variables(): + while True: + await asyncio.sleep(5) + # the number of confirmed messages should be the same as the total messages we sent + print("confirmed_count: " + str(confirmed_count)) + print("message consumed: " + str(messages_consumed)) + + +# Routing instruction for SuperStream Producer +async def routing_extractor(message: AMQPMessage) -> str: + return message.application_properties["id"] + + +# Make producers (producer or superstream producer) +async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer: + host = rabbitmq_data["Host"] + username = rabbitmq_data["Username"] + password = rabbitmq_data["Password"] + port = rabbitmq_data["Port"] + vhost = rabbitmq_data["Virtualhost"] + load_balancer = bool(rabbitmq_data["LoadBalancer"]) + stream_name = rabbitmq_data["StreamName"] + + if bool(rabbitmq_data["SuperStream"]) is False: + + producer = Producer( + host=host, + username=username, + password=password, + port=port, + vhost=vhost, + load_balancer_mode=load_balancer, + ) + + else: + + producer = SuperStreamProducer( # type: ignore + host=host, + username=username, + password=password, + port=port, + vhost=vhost, + load_balancer_mode=load_balancer, + super_stream=stream_name, + routing=RouteType.Hash, + routing_extractor=routing_extractor, + ) + + return producer + + +# metadata and disconnection events for consumers +async def on_close_connection(on_closed_info: OnClosedErrorInfo) -> None: + + print( + "connection has been closed from stream: " + + str(on_closed_info.streams) + + " for reason: " + + str(on_closed_info.reason) + ) + + await asyncio.sleep(2) + # reconnect just if the partition exists + for stream in on_closed_info.streams: + backoff = 1 + while True: + try: + print("reconnecting stream: {}".format(stream)) + await consumer.reconnect_stream(stream) + break + except Exception as ex: + if backoff > 32: + # failed to found the leader + print("reconnection failed") + break + backoff = backoff * 2 + print("exception reconnecting waiting 120s: " + str(ex)) + await asyncio.sleep(30) + continue + + +# Make consumers +async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer: + + host = rabbitmq_data["Host"] + username = rabbitmq_data["Username"] + password = rabbitmq_data["Password"] + port = rabbitmq_data["Port"] + vhost = rabbitmq_data["Virtualhost"] + load_balancer = bool(rabbitmq_data["LoadBalancer"]) + stream_name = rabbitmq_data["StreamName"] + + if bool(rabbitmq_data["SuperStream"]) is False: + + consumer = Consumer( + host=host, + username=username, + password=password, + port=port, + vhost=vhost, + load_balancer_mode=load_balancer, + on_close_handler=on_close_connection, + ) + + else: + + consumer = SuperStreamConsumer( # type: ignore + host=host, + username=username, + password=password, + port=port, + vhost=vhost, + load_balancer_mode=load_balancer, + super_stream=stream_name, + on_close_handler=on_close_connection, + ) + + return consumer + + +# Where the confirmation happens +async def _on_publish_confirm_client(confirmation: ConfirmationStatus) -> None: + global confirmed_count + if confirmation.is_confirmed: + confirmed_count = confirmed_count + 1 + else: + print( + "message id: {} not confirmed. Response code {}".format( + confirmation.message_id, confirmation.response_code + ) + ) + + +async def on_message(msg: AMQPMessage, message_context: MessageContext): + global messages_consumed + messages_consumed = messages_consumed + 1 + # some printf after some messages consumed in order to check that we are working... + if (messages_consumed % 100000) == 0: + stream = await message_context.consumer.stream(message_context.subscriber_name) + offset = message_context.offset + print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset)) + + +async def publish(rabbitmq_configuration: dict): + + global producer + global messages_per_producer + + stream_name = rabbitmq_configuration["StreamName"] + is_super_stream_scenario = bool(rabbitmq_configuration["SuperStream"]) + messages_per_producer = int(rabbitmq_configuration["MessagesToSend"]) + producers = int(rabbitmq_configuration["Producers"]) + delay_sending_msg = int(rabbitmq_configuration["DelayDuringSendMs"]) + + producer = await make_producer(rabbitmq_configuration) # type: ignore + await producer.start() + + # create a stream if it doesn't already exist + if not is_super_stream_scenario: + for p in range(producers): + await producer.create_stream(stream_name + "-" + str(p), exists_ok=True) + + start_time = time.perf_counter() + + for i in range(messages_per_producer): + try: + await asyncio.sleep(delay_sending_msg) + except asyncio.exceptions.CancelledError: + print("exception in sleeping") + return + + amqp_message = AMQPMessage( + body="hello: {}".format(i), + application_properties={"id": "{}".format(i)}, + ) + # send is asynchronous + if not is_super_stream_scenario: + for p in range(producers): + try: + await producer.send( + stream=stream_name + "-" + str(p), + message=amqp_message, + on_publish_confirm=_on_publish_confirm_client, + ) + except Exception as ex: + print("exception while sending " + str(ex)) + + else: + try: + await producer.send(message=amqp_message, on_publish_confirm=_on_publish_confirm_client) # type: ignore + except Exception as ex: + print("exception while sending " + str(ex)) + + await producer.close() + + end_time = time.perf_counter() + print( + f"Sent {messages_per_producer} messages for each of the {producers} producers in {end_time - start_time:0.4f} seconds" + ) + + +async def consume(rabbitmq_configuration: dict): + + global consumer + + is_super_stream_scenario = bool(rabbitmq_configuration["SuperStream"]) + consumers = int(rabbitmq_configuration["Consumers"]) + stream_name = rabbitmq_configuration["StreamName"] + + consumer = await make_consumer(rabbitmq_configuration) # type: ignore + + # create a stream if it doesn't already exist + if not is_super_stream_scenario: + for p in range(consumers): + await consumer.create_stream(stream_name + "-" + str(p), exists_ok=True) + + offset_spec = ConsumerOffsetSpecification(OffsetType.FIRST, None) + await consumer.start() + if not is_super_stream_scenario: + for c in range(consumers): + await consumer.subscribe( + stream=stream_name + "-" + str(c), + callback=on_message, + decoder=amqp_decoder, + offset_specification=offset_spec, + ) + else: + await consumer.subscribe(callback=on_message, decoder=amqp_decoder, offset_specification=offset_spec) # type: ignore + + await consumer.run() + + +async def close(producer_task: asyncio.Task, consumer_task: asyncio.Task, printer_test_task: asyncio.Task): + + global producer + global consumer + + await producer.close() + await consumer.close() + + producer_task.cancel() + consumer_task.cancel() + printer_test_task.cancel() + + +async def main(): + + loop = asyncio.get_event_loop() + loop.add_signal_handler( + signal.SIGINT, lambda: asyncio.create_task(close(producer_task, consumer_task, printer_test_task)) + ) + + configuration = await load_json_file("appsettings.json") + rabbitmq_configuration = configuration["RabbitMQ"] + + producer_task = asyncio.create_task(publish(rabbitmq_configuration)) + consumer_task = asyncio.create_task(consume(rabbitmq_configuration)) + + printer_test_task = asyncio.create_task(print_test_variables()) + + await producer_task + await consumer_task + + +asyncio.run(main()) diff --git a/docs/examples/reliable_client/README.md b/docs/examples/reliable_client/README.md new file mode 100644 index 0000000..b67d2a7 --- /dev/null +++ b/docs/examples/reliable_client/README.md @@ -0,0 +1,30 @@ +Complete example of Reliable Client +--- + +This is an example of hot to use the client in a reliable way. By following these best practices, you can ensure that your +application will be able to recover from network failures, metadata updates and other issues. + +The client take in input some parameters listed in appsettings.json like connection parameters, how many streams, if we want +to run the example in stream and super-stream mode and the possibility to use a load_balancer. + +The example uses the asynchronous send() with the usage of the confirmation callback in order to check for confirmations. + +### Disconnection management. + +Currently the client supports auto-reconnect just for Producer and SuperstreamProducer. +The client does not support auto-reconnect at consumer side at the moment, but it allows you to be notified if such event happens and take action through a callback. +This callback can be passed to the constructor of (superstream)consumers during instantiation. +You can use these callbacks in order to notify your main flow that a disconnection happened to properly close consumers and producers or you can use the method reconnect_stream in order to try to reconnect to the stream. + +As you can see in the example the Consumer and Superstream consumers take a on_close_handler callback which inside it is +checking the stream which has been disconnected and then using the reconnect_stream in order to + +### Metadata Update +If the streams topology changes (ex:Stream deleted or add/remove follower), the client receives a MetadataUpdate event. +The server removes the producers and consumers linked to the stream before sending the Metadata update event. +Similarly to the disconnection scenario the Producer and SuperstreamProducer tries to automatically reconnect while the +Consumer gets notified by a callback (the same one you can use for Disconnection Management) +After this the client can try to reconnect. + +As you can see in the example the Consumer and Superstream consumers take a on_close_handler callback which inside it is +checking the stream which has been disconnected and then using the reconnect_stream in order to \ No newline at end of file diff --git a/docs/examples/reliable_client/appsettings.json b/docs/examples/reliable_client/appsettings.json new file mode 100644 index 0000000..3f25344 --- /dev/null +++ b/docs/examples/reliable_client/appsettings.json @@ -0,0 +1,16 @@ +{ + "RabbitMQ": { + "Host": "localhost", + "Username": "guest", + "Password": "guest", + "Port": 5552, + "Virtualhost": "/", + "LoadBalancer": true, + "SuperStream": false, + "Producers": 3, + "Consumers": 3, + "DelayDuringSendMs":0, + "MessagesToSend": 100000, + "StreamName": "PythonClientTest" + } +} \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 27c12bf..7661b67 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "appnope" @@ -41,8 +41,8 @@ platformdirs = ">=2" regex = ">=2020.1.8" tomli = ">=0.2.6,<2.0.0" typing-extensions = [ - {version = ">=3.10.0.0", markers = "python_version < \"3.10\""}, {version = ">=3.10.0.0,<3.10.0.1 || >3.10.0.1", markers = "python_version >= \"3.10\""}, + {version = ">=3.10.0.0", markers = "python_version < \"3.10\""}, ] [package.extras] @@ -793,6 +793,20 @@ ipython-genutils = "*" [package.extras] test = ["pytest"] +[[package]] +name = "types-requests" +version = "2.31.0.20240406" +description = "Typing stubs for requests" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-requests-2.31.0.20240406.tar.gz", hash = "sha256:4428df33c5503945c74b3f42e82b181e86ec7b724620419a2966e2de604ce1a1"}, + {file = "types_requests-2.31.0.20240406-py3-none-any.whl", hash = "sha256:6216cdac377c6b9a040ac1c0404f7284bd13199c0e1bb235f4324627e8898cf5"}, +] + +[package.dependencies] +urllib3 = ">=2" + [[package]] name = "typing-extensions" version = "3.10.0.2" @@ -869,4 +883,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "486c05f9b07ffbb9f0c215157eeb0857b9561767291eb88b3bc7912f357a5904" +content-hash = "5de52c6a920d1f82f3ea206562629607b157264e03c5e0bd8e7a7b78bf3222e5" diff --git a/pyproject.toml b/pyproject.toml index 25f91cb..9552cea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,8 @@ mmh3 = "^4.0.0" [tool.poetry.group.dev.dependencies] pytest = "^7.4.0" +requests = "^2.31.0" +types-requests = "^2.31.0.20240406" [tool.black] line-length = 110 diff --git a/rstream/__init__.py b/rstream/__init__.py index bafc7dc..79666fe 100644 --- a/rstream/__init__.py +++ b/rstream/__init__.py @@ -16,7 +16,7 @@ from .amqp import AMQPMessage, amqp_decoder # noqa: E402 from .compression import CompressionType # noqa: E402 -from .constants import OffsetType # noqa: E402 +from .constants import OffsetType # noqa: E402; noqa: E402 from .constants import ( # noqa: E402 ConsumerOffsetSpecification, SlasMechanism, diff --git a/rstream/client.py b/rstream/client.py index 981909f..b923320 100644 --- a/rstream/client.py +++ b/rstream/client.py @@ -157,7 +157,8 @@ async def get_stream_count(self): return len(self._streams) async def remove_stream(self, stream: str): - self._streams.remove(stream) + if stream in self._streams: + self._streams.remove(stream) async def get_available_id(self) -> int: if self._current_id <= 256: @@ -480,21 +481,28 @@ async def query_leader_and_replicas( self, stream: str, ) -> tuple[schema.Broker, list[schema.Broker]]: - metadata_resp = await self.sync_request( - schema.Metadata( - self._corr_id_seq.next(), - streams=[stream], - ), - resp_schema=schema.MetadataResponse, - ) - assert len(metadata_resp.metadata) == 1 - metadata = metadata_resp.metadata[0] - assert metadata.name == stream - - brokers = {broker.reference: broker for broker in metadata_resp.brokers} - leader = brokers[metadata.leader_ref] - replicas = [brokers[replica_ref] for replica_ref in metadata.replicas_refs] - return leader, replicas + + while True: + metadata_resp = await self.sync_request( + schema.Metadata( + self._corr_id_seq.next(), + streams=[stream], + ), + resp_schema=schema.MetadataResponse, + ) + assert len(metadata_resp.metadata) == 1 + metadata = metadata_resp.metadata[0] + assert metadata.name == stream + + if metadata.leader_ref == 65535: + await asyncio.sleep(1) + continue + + brokers = {broker.reference: broker for broker in metadata_resp.brokers} + leader = brokers[metadata.leader_ref] + replicas = [brokers[replica_ref] for replica_ref in metadata.replicas_refs] + + return leader, replicas async def subscribe( self, diff --git a/rstream/consumer.py b/rstream/consumer.py index efb9594..33d446c 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -155,6 +155,7 @@ async def _get_or_create_client(self, stream: str) -> Client: connection_closed_handler=self._on_close_handler, connection_name=self._connection_name, ) + leader, replicas = await (await self.default_client).query_leader_and_replicas(stream) broker = random.choice(replicas) if replicas else leader self._clients[stream] = await self._pool.get( @@ -180,7 +181,8 @@ async def _create_subscriber( client = await self._get_or_create_client(stream) # We can have multiple subscribers sharing same connection, so their ids must be distinct - subscription_id = len([s for s in self._subscribers.values() if s.client is client]) + 1 + # subscription_id = len([s for s in self._subscribers.values() if s.client is client]) + 1 + subscription_id = await client.get_available_id() reference = subscriber_name or f"{stream}_subscriber_{subscription_id}" decoder = decoder or (lambda x: x) @@ -357,7 +359,7 @@ async def _on_deliver( await subscriber.client.credit(subscriber.subscription_id, 1) - for (offset, message) in self._filter_messages(frame, subscriber, filter_value): + for offset, message in self._filter_messages(frame, subscriber, filter_value): message_context = MessageContext(self, subscriber.reference, offset, frame.timestamp) maybe_coro = subscriber.callback(subscriber.decoder(message), message_context) @@ -366,6 +368,9 @@ async def _on_deliver( async def _on_metadata_update(self, frame: schema.MetadataUpdate) -> None: + if frame.metadata_info.stream not in self._clients: + return + await self._maybe_clean_up_during_lost_connection(frame.metadata_info.stream) if self._on_close_handler is not None: metadata_update_info = OnClosedErrorInfo("MetaData Update", [frame.metadata_info.stream]) result = self._on_close_handler(metadata_update_info) @@ -445,12 +450,17 @@ def get_stream(self, subscriber_name) -> str: async def reconnect_stream(self, stream: str, offset: Optional[int] = None) -> None: curr_subscriber = None + curr_subscriber_id = None for subscriber_id in self._subscribers: if stream == self._subscribers[subscriber_id].stream: curr_subscriber = self._subscribers[subscriber_id] + curr_subscriber_id = subscriber_id + if curr_subscriber_id is not None: + del self._subscribers[curr_subscriber_id] - # close previous clients and re-create a publisher (with a new client) if stream in self._clients: + if curr_subscriber is not None: + await self._clients[stream].free_available_id(curr_subscriber.subscription_id) await self._clients[stream].close() del self._clients[stream] @@ -468,7 +478,7 @@ async def reconnect_stream(self, stream: str, offset: Optional[int] = None) -> N asyncio.create_task( self.subscribe( stream=curr_subscriber.stream, - subscriber_name=curr_subscriber.reference, + # subscriber_name=curr_subscriber.reference, callback=curr_subscriber.callback, decoder=curr_subscriber.decoder, offset_specification=offset_specification, @@ -501,3 +511,19 @@ async def _close_locator_connection(self): if await (await self.default_client).get_stream_count() == 0: await (await self.default_client).close() self._default_client = None + + async def _maybe_clean_up_during_lost_connection(self, stream: str): + + curr_subscriber = None + + for subscriber_id in self._subscribers: + if stream == self._subscribers[subscriber_id].stream: + curr_subscriber = self._subscribers[subscriber_id] + + if stream in self._clients: + await self._clients[stream].remove_stream(stream) + if curr_subscriber is not None: + await self._clients[stream].free_available_id(curr_subscriber.subscription_id) + if await self._clients[stream].get_stream_count() == 0: + await self._clients[stream].close() + del self._clients[stream]