Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azure.servicebus] Receiving messages with lock that has already expired #6178

Closed
JosPolfliet opened this issue Jun 30, 2019 · 17 comments
Closed
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. Service Bus

Comments

@JosPolfliet
Copy link

JosPolfliet commented Jun 30, 2019

In our application logs, we see that we sometimes receive messages where the lock is already expired.
This happens both when calling the native next() function or using a loop that executes fetch_batch while explicitly disabling prefetching by putting max_batch_size=1.

Usually we get something like this

2019-06-30 14:27:57 DEBUG    adp        Message locked until: 2019-06-30 14:28:27.836000

but sometimes we get expired messages

2019-06-30 14:22:57 DEBUG    adp        Message locked until: 2019-06-30 14:21:24.143000

As far as I can tell, this never happens in the first message, but if the first message takes a long time to process (e.g. longer than the original lock time should it have been locked at the same time as the first message), it does sometimes happen in subsequent messages.

Relevant code:

queue_client = self.client.get_queue(queue)
with queue_client.get_receiver() as receiver:
    while True:
        for message in receiver.fetch_next(max_batch_size=1):
            self.logger.info(f"Processing new message from {queue}")
            self.logger.debug(f"Message locked until: {message.locked_until}")
            # Do long running stuff.

What could be the problem here? Are we doing something wrong?

Relevant pip freeze output

azure-common==1.1.22
azure-keyvault==1.1.0
azure-nspkg==3.0.2
azure-servicebus==0.50.0
azure-storage==0.36.0```
@annatisch
Copy link
Member

Thanks @JosPolfliet!
The connection actually pulls a number of messages to cache locally, and I suspect what you're seeing here is that after the first message takes some time to process - subsequent messages that were cached locally have since expired.

You can alter the number of "cached" messages by setting the "prefetch" value on "get_receiver":

messages = queue_client.get_receiver(prefetch=1)
for message in messages:
    message.complete()

@JosPolfliet
Copy link
Author

Thank you very much for your reply.

From both the docs and the source code, it seems that prefetch=0 seems to be the default, where no prefetching should happen at all. So the problem might not be related to prefetching, cause this is just the original case?

Per your suggestion, I tried with prefetch=1, but it seems to me that htis only makes the problem worse since then we are prefetching at least 1 document? For context, the processing code I'm running typically takes between a couple of minutes and 1 hour.

@alvercau
Copy link

alvercau commented Jul 9, 2019

Hi

I am a colleague of @JosPolfliet working on this issue. I implemented the prefetch = 1, and as anticipated, this did not have any impact. An extract from the logs is below. As you can see, processing the first message takes less than 2 minutes. However, the following messages that are received have expired already. The time to live is set to 10675199 days, hence it cannot be due to reaching the time to live. Is there anything else we can check?

`2019-07-09 09:11:52 INFO adp Downloading new messages from preprocessrequest
2019-07-09 09:11:52 INFO adp Processing new message from preprocessrequest .

2019-07-09 09:11:52 DEBUG azure.servicebus.common.utils Running lock auto-renew thread for 3600 seconds .

2019-07-09 09:11:52 DEBUG azure.servicebus.common.utils 30 seconds or less until lock expires - auto renewing.

2019-07-09 09:11:52 DEBUG adp Message locked until: 2019-07-09 09:12:52.290000 .

2019-07-09 09:12:22 DEBUG azure.servicebus.common.utils 30 seconds or less until lock expires - auto renewing.

2019-07-09 09:12:52 DEBUG azure.servicebus.common.utils 30 seconds or less until lock expires - auto renewing.

2019-07-09 09:13:22 DEBUG azure.servicebus.common.utils 30 seconds or less until lock expires - auto renewing.

2019-07-09 09:13:35 DEBUG adp Finished processing. Marking message as complete .

2019-07-09 09:13:35 DEBUG adp Closing AutoLockRenew .

2019-07-09 09:13:35 INFO adp Downloading new messages from preprocessrequest .

2019-07-09 09:13:35 INFO adp Processing new message from preprocessrequest .

2019-07-09 09:13:35 WARNING adp Received an expired message so skipping it. Was locked until: 2019-07-09 09:12:47.273000. Message contents: {"filePath": "19331272_3_aanbod_pdf.pdf", "fileId": "5d243f7df1d22433e26a474c", "uploadId": "5d243f7df1d22433e26a474c", "organisationId": "5d11ee7807a226ae21ec504f"} .

2019-07-09 09:13:35 INFO adp Downloading new messages from preprocessrequest .
2019-07-09 09:13:35 INFO adp Processing new message from preprocessrequest .
2019-07-09 09:13:35 WARNING adp Received an expired message so skipping it. Was locked until: 2019-07-09 09:12:52.290000. Message contents: {"filePath": "19331046_7_aanvraag_pdf.pdf", "fileId": "5d243f93f1d22433e26a475e", "uploadId": "5d243f93f1d22433e26a475e", "organisationId": "5d11ee7807a226ae21ec504f"}`

@rswgnu
Copy link

rswgnu commented Sep 5, 2019

I am seeing something similar but where the receiver times out and closes after 5 or more minutes (using version 0.50.1 under WSL 1 with Debian). The documentation says that the default is that the receiver stays open until closed since the idle timeout is 0 by default. But that is not what we experience when using ReceiveAndDelete mode.

You really need to provide some samples where a receiver gets one message from the queue say each hour or over a long period past some of the timeouts that exist in your code. The samples I have seen never deal with this. There is also very little documentation on using ReceiveAndDelete mode using Python.

@JosPolfliet
Copy link
Author

It's not exactly hard to recreate either:

from azure.servicebus import ServiceBusClient, Message
import time

CONNECTION_STRING = "" # SET YOUR LOCK DURATION OF THIS QUEUE TO 30seconds
QUEUE_NAME = "test"

sb_client = ServiceBusClient.from_connection_string(CONNECTION_STRING)
queue = sb_client.get_queue(QUEUE_NAME)

for i in range(3):
    queue.send(Message(f"Message {i}"))

messages = queue.get_receiver()
for message in messages:
    print(message)
    print("Is expired? ", message.expired)
    message.complete()
    time.sleep(40)

@jfggdl jfggdl added the Client This issue points to a problem in the data-plane of the library. label Sep 17, 2019
@nicolasgere
Copy link

Same issue there and didn't fount any way to fix it

@annatisch
Copy link
Member

Sorry for the delay in investigating this.
@JosPolfliet - if I run the code snippet that you posted above with debug tracing turned on, and additionally printing the message annotations, we see...

2019-09-17 17:51:21,111 uamqp.c_uamqp INFO     b'<- [TRANSFER]* {2,1,<AF 37 DD D7 44 C6 6D 42 96 E4 68 99 3E AA 8C 23>,0,NULL,false,NULL,NULL,NULL,NULL,true}'
2019-09-17 17:51:21,111 uamqp.c_uamqp INFO     b'-> [FLOW]* {3,65534,1,65535,2,1,1}'
Message 0
Annotations: {b'x-opt-enqueued-time': 1568767877805, b'x-opt-sequence-number': 1, b'x-opt-enqueue-sequence-number': 0, b'x-opt-locked-until': 1568767910712}
Is expired?  False
2019-09-17 17:51:21,133 uamqp.c_uamqp INFO     b'-> [DISPOSITION]* {true,1,1,true,* {}}'
2019-09-17 17:52:01,135 uamqp.c_uamqp INFO     b'<- [TRANSFER]* {2,2,<C2 E9 2D A4 C3 86 0C 45 9D F9 DD 89 D1 B2 CF 1A>,0,NULL,false,NULL,NULL,NULL,NULL,true}'
2019-09-17 17:52:01,135 uamqp.c_uamqp INFO     b'-> [FLOW]* {4,65533,1,65535,2,2,1}'
Message 1
Annotations: {b'x-opt-enqueued-time': 1568767878820, b'x-opt-sequence-number': 2, b'x-opt-enqueue-sequence-number': 0, b'x-opt-locked-until': 1568767910758}
Is expired?  True

This sequence seems to show the correct order if prefetch=0, i.e. the default.
The transfer of the next message doesn't occur until 40 seconds after the first frame has been settled.
However the next frame appears to arrive with annotations indicating that it has already expired..... which is certainly strange.

@nemakam - are you able to provide any insight into why a message would arrive at the client where the 'locked-until' field has already expired?

@annatisch
Copy link
Member

  • @binzywu for input on potential service behaviour

@annatisch
Copy link
Member

Hi @JosPolfliet, @nicolasgere @alvercau - I have identified the root issue behind this and am working on a fix for it now.

@rswgnu
Copy link

rswgnu commented Sep 27, 2019 via email

@alvercau
Copy link

alvercau commented Sep 27, 2019 via email

@annatisch
Copy link
Member

I have a PR open with a potential fix here:
Azure/azure-uamqp-python#96

And preview builds of uAMQP can be download here:
https://dev.azure.com/azure-sdk/public/_build/results?buildId=119291

The fix for this issue requires no changes to the Service Bus SDK.
So it would be great if you could try out the updated uAMQP to see whether it resolves the issue for you.

@rswgnu
Copy link

rswgnu commented Sep 27, 2019 via email

@annatisch
Copy link
Member

annatisch commented Oct 7, 2019

This should now be resolved in the updated uAMQP (v1.2.3), which has just been released. You should be able to install with:

$ pip install --upgrade uamqp

@KieranBrantnerMagee
Copy link
Member

Hey folks, in leiu of hearing any recurring issues with this, closing this out. Don't hesitate to reopen if this problem is still occuring.

@blaketsk
Copy link

blaketsk commented Sep 10, 2020

I'm still facing this issue. The tricky thing - I can reproduce the issue only during receiving messages from topic with high load.
My code is quite straightforward:

sub_client = sb_client.get_subscription(app_config.mq.topic_incoming, app_config.mq.subscription_incoming)
messages = sub_client.get_receiver(prefetch=0)

for message in messages:
    log.warning(str(datetime.now()) + ' Expiration time ' + str(message.locked_until))
    log.warning(str(datetime.now()) + ' Enqueued time ' + str(message.enqueued_time))

    timestamp = datetime.now()
    if (message.locked_until - timestamp).total_seconds() < 2:
        log.warning('Message lock is almost expired. Abandon')
        message.abandon()
        continue

    handle_message(message)
    message.complete

    delta = datetime.now() - timestamp
    log.warning(str(datetime.now()) + ' MQ request processed in ' + str(delta.total_seconds() * 1000) + ' ms.')
WARNING:root:2020-09-10 20:05:30.952249 Expiration time 2020-09-10 20:05:54.272000
WARNING:root:2020-09-10 20:05:30.952249 Enqued time 2020-09-10 13:04:45.504000
WARNING:root:2020-09-10 20:05:32.242848 MQ request processed in 1289.599 ms.
WARNING:root:2020-09-10 20:05:32.250885 Expiration time 2020-09-10 20:05:54.272000
WARNING:root:2020-09-10 20:05:32.250885 Enqued time 2020-09-10 13:04:45.504000
WARNING:root:2020-09-10 20:05:33.591005 MQ request processed in 1340.12 ms.
WARNING:root:2020-09-10 20:05:33.595064 Expiration time 2020-09-10 20:05:54.272000
WARNING:root:2020-09-10 20:05:33.595064 Enqued time 2020-09-10 13:04:45.504000
WARNING:root:2020-09-10 20:05:34.870063 MQ request processed in 1274.0549999999998 ms.
WARNING:root:2020-09-10 20:05:34.874085 Expiration time 2020-09-10 20:05:54.272000
WARNING:root:2020-09-10 20:05:34.874085 Enqued time 2020-09-10 13:04:45.504000
WARNING:root:2020-09-10 20:05:36.126068 MQ request processed in 1251.9830000000002 ms.
WARNING:root:2020-09-10 20:05:36.127068 Expiration time 2020-09-10 20:05:54.272000
WARNING:root:2020-09-10 20:05:36.127068 Enqued time 2020-09-10 13:04:45.504000
WARNING:root:2020-09-10 20:05:37.342069 MQ request processed in 1215.001 ms.
WARNING:root:2020-09-10 20:05:37.344069 Expiration time 2020-09-10 20:05:54.600000
WARNING:root:2020-09-10 20:05:37.344069 Enqued time 2020-09-10 13:04:45.504000

And the result is:

WARNING:root:Message lock is almost expired. Abandon
WARNING:root:Message lock is almost expired. Abandon
WARNING:root:Message lock is almost expired. Abandon
WARNING:root:Message lock is almost expired. Abandon
WARNING:root:Message lock is almost expired. Abandon
WARNING:root:Message lock is almost expired. Abandon
WARNING:root:Message lock is almost expired. Abandon
WARNING:root:Message lock is almost expired. Abandon
WARNING:root:Message lock is almost expired. Abandon

I have tried to debug and figured out that '_received_messages' queue in uamqp/client.py fills up pretty quickly (it can contain hundreds of messages) compared to message handling and it doesn't depend on specified 'prefetch'. On a topic with quite low load everything works as expected.

One more interesting fact: enqueued_time is the same for these messages

azure-servicebus==0.50.3
uamqp==1.2.10

@Gerrit-K
Copy link

Gerrit-K commented Aug 4, 2021

We're experiencing a similar case with our servicebus lately. We have multiple producers and multiple consumers running simultaneously and we observed occasions where the same message is delivered to a second consumer, but then with an already expired lock (we check message._lock_expired - although it's internal, it does exactly what we would do ourselves otherwise). Interestingly the time between the first and the second consumer receiving the message was nowhere near the configured lock duration (5 minutes, and we're not using prefetch btw), but also not instantaneously (usually at least some seconds). To illustrate this, here's one (partly anonymized) example with timestamps:

2021-08-02 18:28:20 | 2021-08-02 16:28:20,557 - INFO - producer-l66pc - MainProcess:MainThread - Sent "1007ebfc-bcc1-48ca-8077-f18e83d9009e"
2021-08-02 18:29:23 | 2021-08-02 16:29:23,777 - INFO - consumer-rr2k4 - MainProcess:MainThread - Received "1007ebfc-bcc1-48ca-8077-f18e83d9009e"
2021-08-02 18:29:24 | 2021-08-02 16:29:24,448 - INFO - consumer-rr2k4 - MainProcess:MainThread - Processed "1007ebfc-bcc1-48ca-8077-f18e83d9009e"
2021-08-02 18:30:02 | 2021-08-02 16:30:02,162 - INFO - consumer-w4pw4 - MainProcess:MainThread - Received "1007ebfc-bcc1-48ca-8077-f18e83d9009e"
2021-08-02 18:30:02 | 2021-08-02 16:30:02,165 - ERROR - consumer-w4pw4 - MainProcess:BusMonitor_0 - Error for "1007ebfc-bcc1-48ca-8077-f18e83d9009e": Message lock has expired

As you can see there, the second consumer receives the same message as the first one (way after it already got processed and completed) but immediately (~3ms) fails due to the expired lock.

We're only experiencing this when our system is under heavy cpu load, but we were not able to reproduce it artificially.

We're using azure-servicebus==7.3.1 and uamqp==1.4.1.

@KieranBrantnerMagee I know this isn't much information, but is there a chance to reopen or reassess this? If you need additional specific info or have an idea on how we might be able to reproduce this, please let me know!

@github-actions github-actions bot locked and limited conversation to collaborators Apr 12, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. Service Bus
Projects
None yet
Development

No branches or pull requests

10 participants