Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't handle ConnectionLost when used as a Consumer #34

Open
TarasLevelUp opened this issue Sep 9, 2015 · 36 comments
Open

Can't handle ConnectionLost when used as a Consumer #34

TarasLevelUp opened this issue Sep 9, 2015 · 36 comments
Milestone

Comments

@TarasLevelUp
Copy link

Hello, thanks for the great library!
Currently I am creating a consumer for indexing objects into another DB. It's quite simple, here's the snippet, that illustrates, what I want to do:

import asyncio
import asynqp


@asyncio.coroutine
def consumer_coro(queue):
    while True:
        msg = yield from queue.get()
        # Execute indexing here
        print("msg received", msg.body)
        msg.ack()


@asyncio.coroutine
def main_coro():
    # connect to the RabbitMQ broker
    connection = yield from asynqp.connect(
        'localhost', 5672, username='guest', password='guest')

    # Open a communications channel
    channel = yield from connection.open_channel()

    # Create a queue and an exchange on the broker
    queue = yield from channel.declare_queue('test.queue')
    event_queue = asyncio.Queue()
    consumer = yield from queue.consume(event_queue.put_nowait)

    try:
        yield from consumer_coro(event_queue)
    except asyncio.CancelledError:
        pass
    finally:
        # Maybe we got KeyboardInterupt? Try to gracefully close everything
        yield from consumer.cancel()
        yield from channel.close()
        yield from connection.close()


def main():
    loop = asyncio.get_event_loop()
    main_task = asyncio.async(main_coro())
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        main_task.cancel()
        loop.run_until_complete(main_task)

if __name__ == "__main__":
    main()

The problem is, that when connection is lost I have no way to see it except of the log:

Exception in callback _SelectorTransport._call_connection_lost(None)
handle: <Handle _SelectorTransport._call_connection_lost(None)>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/events.py", line 120, in _run
    self._callback(*self._args)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/selector_events.py", line 609, in _call_connection_lost
    self._protocol.connection_lost(exc)
  File "/Users/taras/workspace/vortex_tests/.env/lib/python3.4/site-packages/asynqp/protocol.py", line 53, in connection_lost
    raise ConnectionClosedError('The connection was closed')
asynqp.exceptions.ConnectionClosedError: The connection was closed
@TarasLevelUp
Copy link
Author

I did look at the example, that uses loop.set_exception_handler(connection_lost_handler), but I think that is a horrible solution. We really should have a normal callback (ex. in the connection)

@benjamin-hodgson
Copy link
Owner

@TarasLevelUp Thanks for getting in touch. asynqp is definitely still somewhat deficient in the area of error handling. Would your proposed API look something like this?

def f(exc):
    // custom error handling logic
    print(exc)

connection.on_connection_lost(f)

@TarasLevelUp
Copy link
Author

Looks good for me, for start at least. It is consistent with the other parts of the API.

But above this low-level API how about we add a higher-level one, that will contain features like:

  • Queue like interface (ie. you can yield from the queue to get single element), that could raise normal DisconnectError.
  • Maybe auto-reconnect option

I can make a prototype for that.

@benjamin-hodgson
Copy link
Owner

The problem is that the server can terminate the connection at any time, not necessarily while you're waiting to get an item from a queue. And if you're using consume instead of get then you won't even be waiting; there's no coroutine to throw an exception into.

I do agree that attempting to use a connection which we know to be closed should throw an error (and I think in some cases it already does), and that any running consumers should be cancelled. Some of that functionality already works, I think.

@TarasLevelUp
Copy link
Author

The problem is that the server can terminate the connection at any time, not necessarily while you're waiting to get an item from a queue. And if you're using consume instead of get then you won't even be waiting; there's no coroutine to throw an exception into.

Yes, I do know that, that is why the higher-level API for consume is needed, so we have a coroutine to throw an exception to. The only way to go with consume is to implement the on_connection_lost callback.

I do agree that attempting to use a connection which we know to be closed should throw an error (and I think in some cases it already does), and that any running consumers should be cancelled. Some of that functionality already works, I think.

Yes, I did see this behaviour, but again we really should create a separate Exception for Disconnected cases, so ppl can except just that.

@benjamin-hodgson
Copy link
Owner

@TarasLevelUp I think you're right. So I think this work would involve a few things:

  1. Modify the consume API to make it possible to communicate an error to a consumer. I think the object you pass to consume would have a signature which looks something like this:
class MyConsumer:
    def __call__(self, msg):
        pass
    def error(self, exception):
        pass

It should also continue to work with plain functions for the sake of simplicity (if you don't need error handling it should be easy to use a function) and backwards compatibility.

  1. Implement a useful consumer (an object which can be passed to consume) which caches messages in a queue, so clients can yield from it. When the consumer errors, the exception is thrown into the waiting coroutines.
  2. Additionally, provide an on_connection_lost method for Connection, which takes a callback which will be invoked when the connection is closed.

I won't really have time to work on this for a little while, though I would accept a pull request.

@TarasLevelUp
Copy link
Author

Wow, didn't expect such TestSuite... It's hard to read all the cases with so much OOP involved. Kinda expected simple Python TestCase's. It will take some time to hack into those =)

@benjamin-hodgson
Copy link
Owner

@TarasLevelUp Does the API defined in the docstring in my latest commit look like what you had in mind?

@TarasLevelUp
Copy link
Author

Hi, sorry for long feedback, could not find enough time.
I think this interface is quite hard to use as is. Asyncio libraries aim to give a "synchronous" instead of "callback" one. Callbacks should be possible, but for most users it's good to hide it behind a simple synchronous interface.
I see working with the consumer as follows:

queue = yield from connect_and_declare_queue()
consumer = queue.consume(no_ack=True)

while True:
    try:
        msg = yield from consumer.get()
    except ConnectionLostError:
        print("Connection lost.")
        # Do reconnect here or exit application...
    # Process message

I do understand, that this can be achieved by using queue.get, but that one is less efficient.
In case above I presume, that there will be another consumer if we don't pass in a callback.

@benjamin-hodgson
Copy link
Owner

My thought was that you'd be able to pass in an object exposing a yield from-based interface. Something like this:

class QueuingConsumer:
    def __init__(self):
        self.q = asyncio.Queue()
        self.exc = None
    def __call__(self, msg):
        self.q.put_nowait(msg)
    def onerror(self, exc):
        self.exc = exc
    def get(self):
        if self.exc is not None:
            raise self.exc
        yield from self.q.get()

queue = yield from connect_and_declare_queue()
qc = QueuingConsumer()
queue.consume(qc)
yield from qc.get()

This API is more general than your proposal because it allows clients to choose a strategy for consuming messages and handling errors.

@TarasLevelUp
Copy link
Author

  1. on_cancel - this will lead to confusion, as you would expect https://www.rabbitmq.com/consumer-cancel.html from this function. As it is implemented now I can't find a use case, as you will always have to cancel the consumer yourself, in the same loop for it to fire.
  2. on_error - this is confusing for me. Is there any other error, that can be passed, except for a disconnect? If there aren't any, they can use the described above on_connection_lost callback to do the same, can't they. I think it's a more flexible to have disconnect callback in connection class.

It is quite cheap to implement the QueueConsumer on the library side, so why not?

@benjamin-hodgson
Copy link
Owner

  1. From the article you linked:
channel.queueDeclare(queue, false, true, false, null);
Consumer consumer = new QueueingConsumer(channel) {
    @Override
    public void handleCancel(String consumerTag) throws IOException {
        // consumer has been cancelled unexpectedly
    }
};
channel.basicConsume(queue, consumer);

This is more-or-less exactly the API I'm proposing.

  1. on_error would be called in addition to an on_connection_lost callback in the Connection class.

  2. It may be easy to implement, but public APIs require thought and careful design. I want something which fulfils as many use-cases as possible while maintaining compatibility. I'm not against providing the above QueuingConsumer as a useful class in the library, but I want users to be able to write it themselves if they need something specialised.

@TarasLevelUp
Copy link
Author

  1. We should add the API after we add support for Rabbit cancel notification. Because it's so similar it can be confused for a full implementation. Currently you only call it if cancelled by client himself.

How about we add a QueuingConsumer, that will be default behaviour when callback is not passed to queue.consume. And if we want, we can pass a consumer instance to queue.consume, that can have an on_error callback. I really don't want the user to create QueuingConsumer object himself. This looks bad to me:

from asynqp import QueuingConsumer
queue = yield from connect_and_declare_queue()
qc = QueuingConsumer()
queue.consume(qc)
yield from qc.get()

@benjamin-hodgson
Copy link
Owner

It looks pretty idiomatic to me. What don't you like about it in particular?

@TarasLevelUp
Copy link
Author

This one's easier, isn't it:

queue = yield from connect_and_declare_queue()
qc = queue.consume(no_ack=True)
yield from qc.get()

And I don't want to expose the class as part of Public API. Reasons:

  1. I want it to evolve into:
async def start():
    queue = await connect_and_declare_queue()
    async for msg in queue.consume(no_ack=True):
        # Process message

in Python3.5
2) Maybe add auto-reconnect later.

So I want a simple API, that will not break backward compatibility later. If we give a class, ppl will subclass it and might get things broken after next release.

@anton-ryzhov
Copy link
Contributor

I'm +1 to this ticket. I agree that throwing exceptions to eventloop is pretty ugly.

I've looked through yours suggestions about interface, and I have something to offer too.

  1. Add error_calback as parameter:
def consume(callback, *, error_calback=None, no_local=False, no_ack=False, exclusive=False, arguments=None)
  1. Send exception object instead of IncomingMessage object to current callback (breaks compatibility with older callback handlers).

@benjamin-hodgson
Copy link
Owner

Honestly, I still prefer the Consumer class.

queue = yield from connect_and_declare_queue()
qc = queue.consume(no_ack=True)
yield from qc.get()
async def start():
    queue = await connect_and_declare_queue()
    async for msg in queue.consume(no_ack=True):
        # Process message

With both of these proposals, the interface of consume depends on the arguments you passed in to it. This makes for an inconsistent API. In the first example, if you pass no callback then it returns you something from which you can get messages; if you do pass a callback then you just get something you can cancel (but calling get on that object doesn't make sense). The second proposal is even stranger - if you don't pass in a callback then you get an async iterable back. (How do you cancel such a consumer?)

With the QueuingConsumer, consume always takes a callback and always returns something you can use to cancel it; you can optionally turn on extra features such as error handling by adding methods to the callback object.


def consume(callback, *, error_callback=None, no_local=False, no_ack=False, exclusive=False, arguments=None)

This is not a bad idea; it's roughly isomorphic to the Consumer object I'm proposing. (Your proposal is to pass multiple functions; in the other option you effectively pass a record of functions.) How do we decide between the two? The Consumer object works best when the on_error callback naturally belongs as part of the same object as the message callback. The error_callback proposal works best when the two functions are unrelated. Which of these two use-cases is the more common?

Note that you can fairly straightforwardly convert between the two interfaces:

# one way
class ConsumerAdapter:  # or something
    def __init__(self, callback, error_callback):
        self._callback = callback
        self._error_callback = error_callback
    def __call__(self, *args, **kwargs):
        return self._callback(*args, **kwargs)
    def on_error(self, *args, **kwargs):
        return self._error_callback(*args, **kwargs)

queue.consume(ConsumerAdapter(my_callback, my_error_callback))
# the other way
queue.consume(my_consumer, error_callback=my_consumer.on_error)

So the question is, which do we want to be more awkward - the case when your two callbacks are unrelated (you have to wrap them in an object) or the case when your two callbacks naturally form an object (you have to unpack the methods and pass them in)?

I suppose there are some secondary concerns - how many arguments to consume is too many? (If we're going to end up with like 10 different callback arguments then the argument list of consume will be unmanageable.) How likely is it that people will want to subclass a given implementation of Consumer?

Intuition tells me that the Consumer object is the way to go here but I'm open to other arguments in favour of the error_callback idea.


Send exception object instead of IncomingMessage object to current callback (breaks compatibility with older callback handlers).

I don't like this because it requires users to do a run-time type-test of the callback's argument every time. This code will be duplicated in every consumer:

def my_consumer(x):
    if isinstance(x, Exception):
        # handle the error
    else:
        # process the message

@TarasLevelUp
Copy link
Author

Hi again.
@anton-ryzhov Thanks for your interest.
@benjamin-hodgson What I don't like is the callback ideology of the consumer. I do understand, that all libraries on AMQP do that, but it's not asyncio way of handling things. Look at aiozmq, aiopg, aiohttp libraries - they don't operate on callback's. Asyncio was written to allow asynchronous code to be written in a synchronous way. What I would like to do is to add synchronous behaviour to consuming.

I understand, that people are using the library and we don't want to break that, but I think working with something that can raise an exception is way better than working with 2 or more callbacks: on_error, on_cancel, callback.

About your concerns on interface of consume - how about the synchronous consumer will be a separate coroutine, say create_consumer.

As for the 2nd proposal I meant to say:

async def start():
    queue = await connect_and_declare_queue()
    consumer = await queue.create_consumer(no_ack=True)
    async for msg in consumer:
        # Process message

Missed the await call. It's the same interface as with .get(), just added async for support.

What is good about this behaviour is that we will have normal python exceptions propagated through code. Lets say:

@asyncio.coroutine
def consume(connection):
    queue = yield from declare_queue(connection)
    consumer = yield from queue.create_consumer(no_ack=True)
    while True:
        try:
            msg = yield from consumer.get()
        except asynqp.Cancelled:
            break
        # Process message
    # Somewhere else in code
    consumer.cancel()

connection = yield from connect()
while True:
    try:
        yield from consume(connection)
    except asynqp.Disconnected:
        connection = yield from reconnect()

# We can finalize other code here, as we clearly know we are finished here.

How do you implement a clean code for reconnect with callback's? I come up with:

@asyncio.coroutine
def process_message(msg):
    # Process message
    pass

@asyncio.coroutine
def on_error(error):
    if isinstance(error, asynqp.Disconnected):
        connection = yield from reconnect()
        asyncio.async(consume(connection))
    else:
        # We can finalize other code here, I think...

@asyncio.coroutine
def consume(connection):
    queue = yield from declare_queue(connection)
    # Lets even say we can pass coroutines in the ConsumingAdapter
    consumer = yield from queue.consume(
        ConsumingAdapter(process_message, on_error), no_ack=True)
    # Somewhere else in code
    consumer.cancel()

connection = yield from connect()
asyncio.async(consume(connection))

@TarasLevelUp
Copy link
Author

About usecases of async for - I think there are a lot of cases where you don't need to call cancel from another block of code. It's ok to let it cancel after you leave the async for block. It's not that hard to do (say with __del__ method which works good in 3.5+). We can always cancel the coroutine also:

async def start():
    queue = await connect_and_declare_queue()
    consumer = await queue.create_consumer(no_ack=True)
    async for msg in consumer:
        # Process message

consuming_task = asyncio.async(start())
# Somewhere else
consuming_task.cancel()  # We cancel the task here, not our consumer

And we can also leave the cancel method if that is a concern for the interface, or if you really hate to use __del__ for closing the consumer:

async def start():
    queue = await connect_and_declare_queue()
    consumer = await queue.create_consumer(no_ack=True)
    async for msg in consumer:  # This will just stop after consumer is cancelled
        # Process message
    # Somewhere else in anothe coroutine
    await consumer.cancel()

We can write the same use case in Python3.4 using get() function. It is the same proposal as I said above:

@asyncio.coroutine
def start():
    queue = yield from connect_and_declare_queue()
    consumer = yield from queue.create_consumer(no_ack=True)
    while True:
        try:
            msg = yield from consumer.get()
        except asynqp.Cancelled:
            break
        # Process message
    # Somewhere else in code
    yield from consumer.cancel()

@anton-ryzhov
Copy link
Contributor

@benjamin-hodgson

The error_callback proposal works best when the two functions are unrelated. Which of these two use-cases is the more common?

I's pretty common usecase — callback handles messages, one handler per consumer; and one error handler per connection which tries to reconnect.

# one way
class ConsumerAdapter:  # or something

It's so big to reimplement it for every consumer. I do prefer such thing left inside library, not as its interface.

queue.consume(ConsumerAdapter(my_callback, my_error_callback))

It doesn't change anything except typing extra characters to type.

@anton-ryzhov
Copy link
Contributor

@TarasLevelUp
I'm +1 for asyncio callback-less style, raising better than bunch of callbacks.

@benjamin-hodgson
Copy link
Owner

@anton-ryzhov I agree that ConsumerAdapter should be provided as part of the library - the question is which of the two APIs is more convenient? It seems usual to me that the message callback and the error callback might need to access the same resources, so putting them in a class seems natural. On the other hand, it's quite trivial (though maybe not obvious) to convert an object into a set of callbacks; going the other way requires some boilerplate which I'd have to maintain.


It's not a binary choice between callbacks and async. It's trivial to convert between the two styles, so at its core the choice we make is arbitrary. Do you want to create an object, pass it in, and async for through that, or do you want to async for over subscribe directly?

Again, it's a question of which is more convenient. Both options seem quite natural to me, though I do agree that async for msg in subscribe(...) wins on simplicity. On the other hand, given that we're targeting Python 3.4+ (not 3.5 only), and we want to maintain backwards compatibility as much as is reasonable, the callback style has its own advantages.

Ultimately, what I value is a simple and consistent API. By 'consistent' I mean consistency within the API, consistency between versions of asynqp and consistency between versions of Python. I don't mind doing extra work in the library to achieve that goal. It's just not clear to me which solution maximises both the 'simple' and 'consistent' constraints. Your collective feedback is very helpful to me in that respect.


Also, @TarasLevelUp:

It's ok to let it cancel after you leave the async for block.

You will never leave the async for block unless you cancel the consumer. A consumer will keep receiving messages for ever, waiting an indefinite amount of time between each one. Hooking into the cancellation of the Task by overriding __del__ seems like a recipe for disaster to me... When a finaliser gets called is implementation-specific; clients of classes with finalisers inevitably run into unexpected problems; and in any case cancelling the Task is not the same as cancelling the consumer.

Looking at your suggested alternative API:

consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer:  # This will just stop after consumer is cancelled
    # Process message

# Somewhere else in another coroutine
await consumer.cancel()

To me, this scarcely looks different than what I'm proposing:

consumer = QueuingConsumer()
await queue.consume(consumer)
async for msg in consumer:  # This will just stop after consumer is cancelled
    # Process message

# Somewhere else in another coroutine
await consumer.cancel()

... but I believe my proposal to be more consistent, as outlined above.

@anton-ryzhov
Copy link
Contributor

@benjamin-hodgson

ConsumerAdapter should be provided as part of the library… the message callback and the error callback… putting them in a class seems natural.

Am I right, I (library user) have to put it together by myself. Create some class with handlers, instantiate it with some external links to my application; and then send it to aiomysqls API. Too much extra work, too much extra things to create. It stinks like Java)

the message callback and the error callback might need to access the same resources

But what if not the same? Users will have to join into one class non-linked things.

On the other hand, given that we're targeting Python 3.4+ (not 3.5 only), and we want to maintain backwards compatibility as much as is reasonable, the callback style has its own advantages.

What problems do you see with 3.4 compatibility? All async/await stuff is just syntax sugar around yield from things. And library has to support yield from style without async/await; but keeping in mind that it can be used with 3.5.
3.4 users have to implement syntax sugar until they upgrade to 3.5. And this upgrade will be more easily if they have to fix only syntax of some constructions, not the whole logic.

... but I believe my proposal to be more consistent, as outlined above.
I do see only difference in this examples that second one makes me to import QueuingConsumer from somewhere and implement it somehow; and first one makes everything for me. I'm choosing simplest way.

Talking about finalization and task cancellation, I believe it should be used like that:

try:
    consumer = await queue.create_consumer(no_ack=True)
    async for msg in consumer:  # This will just stop after consumer is cancelled
        # Process message
finally: 
    await consumer.cancel()


# Somewhere else in another coroutine
await consumer.cancel() # will naturally stop iteration, finally will do nothing
# or
coro.cancel() # will raise CancelledError inside coro, finally will do cleanup

@benjamin-hodgson
Copy link
Owner

I (library user) have to put it together by myself. Create some class with handlers, instantiate it with some external links to my application; and then send it to aiomysqls API. Too much extra work, too much extra things to create.

You have to write that code somewhere.

But what if not the same? Users will have to join into one class non-linked things.

I've already explained that it's straightforward for asynqp to provide an adapter class. The two proposals are entirely equivalent in terms of power; it's just a question of convenience and natural-ness. Which is more convenient more of the time?

All async/await stuff is just syntax sugar around yield from things.

At its core, yes, but you don't get things like async for for free. I'd have to implement __aiter__ and __anext__ alongside a 3.4-compatible version of the same API.

I'm choosing simplest way.

An idea which seems simple can often have unexpected consequences or interactions with other parts of a system. I've already explained why I am concerned about inconsistency.

Talking about finalization and task cancellation

Your try/finally idea looks OK to me. What are the consequences of forgetting to use a try/finally? You'll end up with an un-cancelled consumer that does nothing upon receiving a message, though I suppose you'd probably be able to end up in that situation however you slice it.

@anton-ryzhov
Copy link
Contributor

What are the consequences of forgetting to use a try/finally

It doesn't make worse current callback implementation, there is much more simpler to forget about cancellation.

@benjamin-hodgson
Copy link
Owner

I feel like we're not making much progress here. To achieve some clarity, let me enumerate some of the proposals and my thoughts on the pros and cons of each one, with regards to the common use case of queuing up messages in memory and getting them "synchronously"

1. on_error parameter to consume

def my_consumer(msg):
    ...
def my_error_callback(exc):
    ...
queue.consume(my_consumer, error_callback=my_error_callback)

Pros:

  • Backwards compatible
  • Consistent with existing callback style
  • Simple when people don't care about errors
  • Works best when error_callback and my_consumer don't naturally form an object

Cons:

  • Easy to forget error_callback - then there's no way to be notified of an error
  • Not obvious how to unpack an object into two callbacks, when the two functions do naturally form an object. (An example of such a case would be QueuingConsumer.)
    • This makes the common use case quite unnatural - create a QueuingConsumer, pass in the methods individually, then yield from the thing you just passed in.

2. on_error method on callback object

class MyConsumer:
    def __call__(self, msg):
        ...
    def on_error(self, exc):
        ...
c = MyConsumer()
queue.consume(c)
# then...
yield from c.get()

asynqp would provide some standard consumer classes like QueuingConsumer and ConsumerAdapter.

Pros:

  • Backwards compatible
  • Simple when people don't care about errors
  • Works best when message and error actions do form an object (QueuingConsumer would be simple to use - just create it and pass it in, rather than passing in the methods separately.)
  • Natural to add support for yield from or async for consuming strategy (eg QueuingConsumer)
  • Impossible to ignore errors when using a QueuingConsumer - they will be thrown into your coroutine.
  • Can subclass standard classes, if you just need to make minor adjustments to existing behaviour

Cons:

  • __call__ method on consumer object not obvious
    • Variation: don't use __call__, use a different name such as on_msg.
    • Not backwards compatible; breaks the common case of passing in a function
      • It could be implemented backwards-compatibly by looking for on_msg or __call__ in the implementation of consume, but we'd have to set up rules of what takes precedence and it feels a little weird
    • Could supply a base class to convert from one form to the other
    • Rather Java-esque
  • Marginally more boilerplate when you want to use separate functions as the callbacks (queue.consume(ConsumerAdapter(my_callback, my_error_handler))
  • Marginally more boilerplate when using the QueuingConsumer
  • Cancelling a consumer is not natural - you yield from the thing you pass in, but cancel the thing you get back.

3. consume creates a consumer object when you don't pass in a callback

c = MyConsumer()
queue.consume(c)
# or...
c = queue.consume()
# then...
yield from c.get()

Under this proposal, calling consume without a callback argument would create and return a QueuingConsumer

Pros:

  • Backwards compatible
  • Minimal boilerplate when you don't want to use a QueuingConsumer.
  • Could be combined with proposals 1 or 2.
  • When not using a callback, impossible to ignore errors. They will always be thrown as exceptions into your coroutine.

Cons:

  • Weird, weird inconsistencies. The signature of the consume method is not stable, as outlined above
    • This makes this option a non-starter in my opinion
  • Special cases, and extra code, to support both options, rather than one design with flexibility built in.

4. consume always creates a QueuingConsumer - no callbacks

This is effectively proposal 3 without the option of passing in a callback. This solves the issue of API consistency.

c = queue.consume()
# then...
yield from c.get()

Pros:

  • Common case is compelling and Pythonic - especially when you don't need to cancel the consumer: async for msg in queue.consume().
  • Always impossible to ignore errors - they will be thrown as exceptions into your coroutine.

Cons:

  • Backwards incompatible
  • Opinionated - no way to customise the behaviour of the consumer. (What do you do if you don't want to use a QueuingConsumer?)

Conclusion

None of these options preclude an async for-based API. The first two build it out of a lower-level callback-based API; the latter two build it in.

In my opinion, Option 2 strikes the best balance, even though it's a bit strange to cancel a different object than the one you yield from. Option 4 is a strong contender too, but it has problems with backwards compatibility and has no provisions for customisation; there's no way to drop down to a lower level if you need to do something unusual.

Do you have any more suggestions, or can you think of any other pros and cons of these options? If not, then consider the decision made.

@tvoinarovskyi
Copy link
Contributor

Hello again
The 4 cases above don't cover my last proposal, so let me describe it here in the same manner as the above:

5. We leave consume as backward-compatible function and introduce create_consumer without callbacks

This is effectively proposal 4, but we leave consume. This solves the issue of backward incompatibility issue.

c = queue.create_consumer()
# then...
yield from c.get()

Pros:

  • Always impossible to ignore errors - they will be thrown as exceptions into your coroutine.
  • We can deprecate consume if we think that is appropriate.
  • It uses the same object for cancelation c.close()
  • Compared to proposal 3 we don't create the object ourselves so never have an issue on calling c.close() before we actually started a consumer
  • We can provide very clean and descriptive documentation on this method. In all other proposals we will need to describe it inside 1 big description of consume. Even now it is very confusing.

Cons:

  • Users of the callback style can not make their own QueuingConsumer by subclassing one.
  • Users can't do something unusual

@tvoinarovskyi
Copy link
Contributor

Now in another comment I will describe why do I care so much about it.
I tried to implement proposal 2: this commit
Code is quite simple, but I have big problems with the documentation:

  • We call the result of the consume a consumer. With this in mind it is hard to explain why do we pass in a QueuedConsumer and receive another consumer as a result. Moreover those are different objects, and the other one is used to cancel only.
  • QueuedConsumer requires a loop to create tasks. So the example is some more extensive:
consumer = QueuedConsumer(loop=loop)
handle = yield from self.queue.consume(consumer, no_ack=True)
msg = yield from consumer.get()
# Somewhere else
yield from handle.close()
  • lower-level of proposal 2 is hard to express. If we give QueuedConsumer as an example, it's code is hard for users. Look for yourselves in the commit above, and that is just a 1 hour work implementation, I think it will be even bigger once we do a speed optimisation and go through all cases.

I think, that by trying to combine 2 interfaces into 1 we will have problems for users to understand how to use it. It's quite easy to describe it with proposal 5, as it is a new, simple, straight-forward method without class inheritance, callbacks or adapters.

there's no way to drop down to a lower level if you need to do something unusual.

@benjamin-hodgson Why would you want to do something unusual? Any examples of use-cases. I can't come up with any (.

@benjamin-hodgson
Copy link
Owner

I would be amenable to implementing create_consumer in addition to a lower-level API. Then create_consumer would basically be a convenience function that creates, subscribes and returns the consumer.
I'm also up for renaming the existing Consumer task to (something like) CancellationToken. I don't expect this to result in any broken code.

@tvoinarovskyi
Copy link
Contributor

If we do make 'create_consumer', you understand, that it will return 1 object, not 2. So cancelation will be as in proposal 5.

@benjamin-hodgson
Copy link
Owner

It can just return 2 objects.

cancellation_token, consumer = yield from queue.create_consumer(...)

@anton-ryzhov
Copy link
Contributor

I'm +1 for fifth proposal.

I didn't get what CancellationToken is and why it should return 2 objects.

I remind you that throwing exceptions inside connection_lost is ugly. I expect library shouldn't do it at least if some new API will be used.

@tvoinarovskyi
Copy link
Contributor

It can just return 2 objects.

cancellation_token, consumer = yield from queue.create_consumer(...)

Why do you want create_consumer to be built on top of consume, it's ugly to return 2 objects when you expect 1. What I want from a library is a simple API, and I think most will be up for it too.

Implementation is simple if we don't set QueuedConsumer as public API. And I will repeat myself I think it's a good idea to NOT put it as part of public API, it's too complex to be used as an example, and will be getting more complex in the future.

@tvoinarovskyi
Copy link
Contributor

Made some progress on the issue in my branch , and now I can see another reason to have a separate API for QueuedConsumer:
The logic for no_ack=True and no_ack=False is different:

  • On no_ack=True we must wait for all messages to get processed, as they were removed from queue already.
  • As for no_ack=False we will need to purge all pending messages on errors, as they can not be ack'ed nor reject'ed afterwords.

@tvoinarovskyi
Copy link
Contributor

In a separate branch I created the 5-th proposal implementation with examples for Python3.4 and Python3.5
A note on Python3.5 implementation:

  • Wrapped the queued_consumer in an async context manger. Now the problem of not calling consumer.cancel() will be lessen.
    async with self.queue.queued_consumer() as consumer:
        async for msg in consumer:
            pass

@benjamin-hodgson
Copy link
Owner

Good idea! Nice way to ensure the consumer gets cancelled in the event of an application error.

@benjamin-hodgson benjamin-hodgson added this to the v0.6 milestone May 1, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants