-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can't handle ConnectionLost when used as a Consumer #34
Comments
I did look at the example, that uses |
@TarasLevelUp Thanks for getting in touch. def f(exc):
// custom error handling logic
print(exc)
connection.on_connection_lost(f) |
Looks good for me, for start at least. It is consistent with the other parts of the API. But above this
I can make a prototype for that. |
The problem is that the server can terminate the connection at any time, not necessarily while you're waiting to get an item from a queue. And if you're using I do agree that attempting to use a connection which we know to be closed should throw an error (and I think in some cases it already does), and that any running consumers should be cancelled. Some of that functionality already works, I think. |
Yes, I do know that, that is why the
Yes, I did see this behaviour, but again we really should create a separate Exception for Disconnected cases, so ppl can except just that. |
@TarasLevelUp I think you're right. So I think this work would involve a few things:
class MyConsumer:
def __call__(self, msg):
pass
def error(self, exception):
pass It should also continue to work with plain functions for the sake of simplicity (if you don't need error handling it should be easy to use a function) and backwards compatibility.
I won't really have time to work on this for a little while, though I would accept a pull request. |
Wow, didn't expect such TestSuite... It's hard to read all the cases with so much OOP involved. Kinda expected simple Python TestCase's. It will take some time to hack into those =) |
@TarasLevelUp Does the API defined in the docstring in my latest commit look like what you had in mind? |
Hi, sorry for long feedback, could not find enough time. queue = yield from connect_and_declare_queue()
consumer = queue.consume(no_ack=True)
while True:
try:
msg = yield from consumer.get()
except ConnectionLostError:
print("Connection lost.")
# Do reconnect here or exit application...
# Process message I do understand, that this can be achieved by using |
My thought was that you'd be able to pass in an object exposing a class QueuingConsumer:
def __init__(self):
self.q = asyncio.Queue()
self.exc = None
def __call__(self, msg):
self.q.put_nowait(msg)
def onerror(self, exc):
self.exc = exc
def get(self):
if self.exc is not None:
raise self.exc
yield from self.q.get()
queue = yield from connect_and_declare_queue()
qc = QueuingConsumer()
queue.consume(qc)
yield from qc.get() This API is more general than your proposal because it allows clients to choose a strategy for consuming messages and handling errors. |
It is quite cheap to implement the QueueConsumer on the library side, so why not? |
channel.queueDeclare(queue, false, true, false, null);
Consumer consumer = new QueueingConsumer(channel) {
@Override
public void handleCancel(String consumerTag) throws IOException {
// consumer has been cancelled unexpectedly
}
};
channel.basicConsume(queue, consumer); This is more-or-less exactly the API I'm proposing.
|
How about we add a
|
It looks pretty idiomatic to me. What don't you like about it in particular? |
This one's easier, isn't it: queue = yield from connect_and_declare_queue()
qc = queue.consume(no_ack=True)
yield from qc.get() And I don't want to expose the class as part of Public API. Reasons:
async def start():
queue = await connect_and_declare_queue()
async for msg in queue.consume(no_ack=True):
# Process message in Python3.5 So I want a simple API, that will not break backward compatibility later. If we give a class, ppl will subclass it and might get things broken after next release. |
I'm +1 to this ticket. I agree that throwing exceptions to eventloop is pretty ugly. I've looked through yours suggestions about interface, and I have something to offer too.
def consume(callback, *, error_calback=None, no_local=False, no_ack=False, exclusive=False, arguments=None)
|
Honestly, I still prefer the queue = yield from connect_and_declare_queue()
qc = queue.consume(no_ack=True)
yield from qc.get() async def start():
queue = await connect_and_declare_queue()
async for msg in queue.consume(no_ack=True):
# Process message With both of these proposals, the interface of With the def consume(callback, *, error_callback=None, no_local=False, no_ack=False, exclusive=False, arguments=None) This is not a bad idea; it's roughly isomorphic to the Note that you can fairly straightforwardly convert between the two interfaces: # one way
class ConsumerAdapter: # or something
def __init__(self, callback, error_callback):
self._callback = callback
self._error_callback = error_callback
def __call__(self, *args, **kwargs):
return self._callback(*args, **kwargs)
def on_error(self, *args, **kwargs):
return self._error_callback(*args, **kwargs)
queue.consume(ConsumerAdapter(my_callback, my_error_callback)) # the other way
queue.consume(my_consumer, error_callback=my_consumer.on_error) So the question is, which do we want to be more awkward - the case when your two callbacks are unrelated (you have to wrap them in an object) or the case when your two callbacks naturally form an object (you have to unpack the methods and pass them in)? I suppose there are some secondary concerns - how many arguments to Intuition tells me that the
I don't like this because it requires users to do a run-time type-test of the callback's argument every time. This code will be duplicated in every consumer: def my_consumer(x):
if isinstance(x, Exception):
# handle the error
else:
# process the message |
Hi again. I understand, that people are using the library and we don't want to break that, but I think working with something that can About your concerns on interface of As for the 2nd proposal I meant to say: async def start():
queue = await connect_and_declare_queue()
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer:
# Process message Missed the await call. It's the same interface as with What is good about this behaviour is that we will have normal python exceptions propagated through code. Lets say: @asyncio.coroutine
def consume(connection):
queue = yield from declare_queue(connection)
consumer = yield from queue.create_consumer(no_ack=True)
while True:
try:
msg = yield from consumer.get()
except asynqp.Cancelled:
break
# Process message
# Somewhere else in code
consumer.cancel()
connection = yield from connect()
while True:
try:
yield from consume(connection)
except asynqp.Disconnected:
connection = yield from reconnect()
# We can finalize other code here, as we clearly know we are finished here. How do you implement a clean code for reconnect with callback's? I come up with: @asyncio.coroutine
def process_message(msg):
# Process message
pass
@asyncio.coroutine
def on_error(error):
if isinstance(error, asynqp.Disconnected):
connection = yield from reconnect()
asyncio.async(consume(connection))
else:
# We can finalize other code here, I think...
@asyncio.coroutine
def consume(connection):
queue = yield from declare_queue(connection)
# Lets even say we can pass coroutines in the ConsumingAdapter
consumer = yield from queue.consume(
ConsumingAdapter(process_message, on_error), no_ack=True)
# Somewhere else in code
consumer.cancel()
connection = yield from connect()
asyncio.async(consume(connection)) |
About usecases of async def start():
queue = await connect_and_declare_queue()
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer:
# Process message
consuming_task = asyncio.async(start())
# Somewhere else
consuming_task.cancel() # We cancel the task here, not our consumer And we can also leave the async def start():
queue = await connect_and_declare_queue()
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer: # This will just stop after consumer is cancelled
# Process message
# Somewhere else in anothe coroutine
await consumer.cancel() We can write the same use case in Python3.4 using @asyncio.coroutine
def start():
queue = yield from connect_and_declare_queue()
consumer = yield from queue.create_consumer(no_ack=True)
while True:
try:
msg = yield from consumer.get()
except asynqp.Cancelled:
break
# Process message
# Somewhere else in code
yield from consumer.cancel() |
I's pretty common usecase — callback handles messages, one handler per consumer; and one error handler per connection which tries to reconnect. # one way
class ConsumerAdapter: # or something It's so big to reimplement it for every consumer. I do prefer such thing left inside library, not as its interface. queue.consume(ConsumerAdapter(my_callback, my_error_callback)) It doesn't change anything except typing extra characters to type. |
@TarasLevelUp |
@anton-ryzhov I agree that It's not a binary choice between callbacks and Again, it's a question of which is more convenient. Both options seem quite natural to me, though I do agree that Ultimately, what I value is a simple and consistent API. By 'consistent' I mean consistency within the API, consistency between versions of Also, @TarasLevelUp:
You will never leave the Looking at your suggested alternative API: consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer: # This will just stop after consumer is cancelled
# Process message
# Somewhere else in another coroutine
await consumer.cancel() To me, this scarcely looks different than what I'm proposing: consumer = QueuingConsumer()
await queue.consume(consumer)
async for msg in consumer: # This will just stop after consumer is cancelled
# Process message
# Somewhere else in another coroutine
await consumer.cancel() ... but I believe my proposal to be more consistent, as outlined above. |
Am I right, I (library user) have to put it together by myself. Create some class with handlers, instantiate it with some external links to my application; and then send it to
But what if not the same? Users will have to join into one class non-linked things.
What problems do you see with 3.4 compatibility? All
Talking about finalization and task cancellation, I believe it should be used like that: try:
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer: # This will just stop after consumer is cancelled
# Process message
finally:
await consumer.cancel()
# Somewhere else in another coroutine
await consumer.cancel() # will naturally stop iteration, finally will do nothing
# or
coro.cancel() # will raise CancelledError inside coro, finally will do cleanup |
You have to write that code somewhere.
I've already explained that it's straightforward for
At its core, yes, but you don't get things like
An idea which seems simple can often have unexpected consequences or interactions with other parts of a system. I've already explained why I am concerned about inconsistency.
Your try/finally idea looks OK to me. What are the consequences of forgetting to use a |
It doesn't make worse current callback implementation, there is much more simpler to forget about cancellation. |
I feel like we're not making much progress here. To achieve some clarity, let me enumerate some of the proposals and my thoughts on the pros and cons of each one, with regards to the common use case of queuing up messages in memory and getting them "synchronously" 1.
|
Hello again 5. We leave
|
Now in another comment I will describe why do I care so much about it.
consumer = QueuedConsumer(loop=loop)
handle = yield from self.queue.consume(consumer, no_ack=True)
msg = yield from consumer.get()
# Somewhere else
yield from handle.close()
I think, that by trying to combine 2 interfaces into 1 we will have problems for users to understand how to use it. It's quite easy to describe it with proposal 5, as it is a new, simple, straight-forward method without class inheritance, callbacks or adapters.
@benjamin-hodgson Why would you want to do something unusual? Any examples of use-cases. I can't come up with any (. |
I would be amenable to implementing |
If we do make 'create_consumer', you understand, that it will return 1 object, not 2. So cancelation will be as in proposal 5. |
It can just return 2 objects. cancellation_token, consumer = yield from queue.create_consumer(...) |
I'm +1 for fifth proposal. I didn't get what I remind you that throwing exceptions inside |
Why do you want Implementation is simple if we don't set |
Made some progress on the issue in my branch , and now I can see another reason to have a separate API for QueuedConsumer:
|
In a separate branch I created the 5-th proposal implementation with examples for Python3.4 and Python3.5
async with self.queue.queued_consumer() as consumer:
async for msg in consumer:
pass |
Good idea! Nice way to ensure the consumer gets cancelled in the event of an application error. |
Hello, thanks for the great library!
Currently I am creating a consumer for indexing objects into another DB. It's quite simple, here's the snippet, that illustrates, what I want to do:
The problem is, that when connection is lost I have no way to see it except of the log:
The text was updated successfully, but these errors were encountered: