Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pub/Sub] Publisher stops publishing after RetryError Deadline of 600 exceeded error not being surfaced #7822

Closed
jam182 opened this issue Apr 30, 2019 · 6 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@jam182
Copy link

jam182 commented Apr 30, 2019

Often, the publisher client stops working but without surfacing the stacktrace that never reaches our code. The result is the application hanging without failing any healthcheck. For now we had to set up an alert for when we see the log in stackdriver but that is really bad.

Stacktrace

Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py", line 259, in monitor
    return self._commit()
  File "/usr/lib/python2.7/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py", line 207, in _commit
    self._messages,
  File "/usr/lib/python2.7/site-packages/google/cloud/pubsub_v1/gapic/publisher_client.py", line 398, in publish
    request, retry=retry, timeout=timeout, metadata=metadata)
  File "/usr/lib/python2.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/google/api_core/retry.py", line 270, in retry_wrapped_func
    on_error=on_error,
  File "/usr/lib/python2.7/site-packages/google/api_core/retry.py", line 199, in retry_target
    last_exc,
  File "/usr/lib/python2.7/site-packages/six.py", line 737, in raise_from
    raise value
RetryError: Deadline of 600.0s exceeded while calling <functools.partial object at 0x7f34dc9c9838>, last exception: 503 Connect Failed

OS type and version

Alpine 3.8

Python version and virtual environment information: python --version

2.7.14

pip freeze

bernhard==0.2.6
boto==2.27.0
cachetools==2.1.0
certifi==2019.3.9
chardet==3.0.4
click-replayer==0.0.1
configobj==4.7.2
debugtrace==0.0.1
enum34==1.1.6
funcsigs==1.0.2
futures==3.2.0
google-api-core==1.9.0
google-auth==1.6.3
google-cloud-pubsub==0.38.0
googleapis-common-protos==1.5.9
grpc-google-iam-v1==0.11.4
grpcio==1.20.0
idna==2.8
meld3==1.0.2
mock==2.0.0
MySQL-python==1.2.5
pbr==5.1.3
protobuf==3.7.1
py==1.8.0
pyasn1==0.4.5
pyasn1-modules==0.2.4
pygeoip==0.2.7
pyparsing==2.2.0
pytest==3.2.2
python-dateutil==2.2
pytz==2019.1
raven==4.0.4
redirecting==0.0.1
redis==2.10.1
requests==2.21.0
rsa==4.0
six==1.12.0
skimpubsub==1.1.4
SQLAlchemy==1.1.15
supervisor==3.2.4
ua-parser==0.8.0
ujson==1.35
urllib3==1.24.2
user-agents==1.1.0
uWSGI==2.0.14
Werkzeug==0.9.6

Extra

It mostly happens in one particular region asia-southeast1-b but in general it happens in all the regions (US, EU, etc..).

@plamut
Copy link
Contributor

plamut commented Apr 30, 2019

@jam182 Thank you for reporting this. From the error description it seems that this is the same issue as #7709 ?

Edit: Sorry, this is the publisher client, not subscriber. But the symptom seems similar, and both issues might have the same underlying cause.

@plamut plamut added api: pubsub Issues related to the Pub/Sub API. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Apr 30, 2019
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label May 1, 2019
@tseaver tseaver added priority: p2 Moderately-important priority. Fix may not be included in next release. and removed triage me I really want to be triaged. labels May 1, 2019
@plamut
Copy link
Contributor

plamut commented May 2, 2019

@jam182 Just as a sanity check, how does your publisher client code look like? Does it use the Future object returned by the publisher.publish() call and interact with it?

Something like the following:

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

while True:
    msg = b"some message"
    future = publisher.publish(topic_path, msg)

    try:
        result = future.result()
    except Exception as ex:
        # handle exception

    time.sleep(3)

I was able to replicate the reported behavior (an error not surfacing to the publisher code) if I faked an error in the underlying channel, and also ignored the future object returned by the publish.publish() call. More specifically, if the client script never called future.result().

I also tried an alternative approach that uses future.add_cone_callback():

def my_callback(future):
    result = future.result()
    # do something with result

...
while True:
    ...
    future = publisher.publish(topic_path, msg)
    future.add_done_callback(my_callback)
    ...

In this case, the exception occurred in the callback (in future.result() line), but the main thread nevertheless kept running, because the callback is invoked in the background by another thread, and the client script code never terminated despite the error.

Could any of these two scenarios be applicable to your case?

@plamut plamut self-assigned this May 2, 2019
@jam182
Copy link
Author

jam182 commented May 2, 2019

I'd say it's a combination of both.
we publish a message and delegate a different thread to the double checking of all the futures, just in case pubsub ever goes down.

the code would look something like this:

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)

events = Queue(maxsize=100000)


def check_response():
    while True:
        logger.debug('Checking the queue')
        message = events.get(block=True)
        logger.debug('Checking the message')
        _callback = partial(callback, message)
        message.response.add_done_callback(_callback)


def callback(message, future):
    try:
        logger.debug('Callback is waiting for the future')
        message_id = future.result(timeout=2)
        logger.debug('Future resolved: %s', message_id)
    except Exception:
        logger.exception('Failed to publish message. Retry.')
        message.retry()


class Message(object):
    """This represents one item in the events queue."""

    __slots__ = ('response', 'publish_func', 'record')

    def __init__(self, response, publish_func, record):
        self.response = response
        self.publish_func = publish_func
        self.record = record

    def retry(self):
        self.response = self.publish_func(self.record)
        queue_size = events.qsize()
        try:
            events.put_nowait(self)
        except Full:
            logger.error('Events queue is full. Messages are not being checked anymore.')


class PubSubLogger(object):

    def __init__(self, config):
        self.pub_sub_client = PubSubTopicClient(config['topic_name'])  # this returns the google publisher client object to always publishes on a specific project/topic
        Thread(target=check_response).start()

    def log_event(event):
        try:
            events.put_nowait(
                Message(
                    self.pub_sub_client.publish_message(event),
                    self.pub_sub_client.publish_message,
                    event,
                )
            )
         except Full:
            self.logger.error('Clicks queue is full. Messages are not being checked anymore.')

we don't really see the Events queue is full log message anyway.
The other thing I should mention probably is that the above code is running in a uwsgi MULE worker.

@jam182
Copy link
Author

jam182 commented May 2, 2019

UPDATE: in the pip freeze I posted there is google-cloud-pubsub==0.38.0 but I just noticed that there is 0.40.0 out which has #7071

I updated our pubsub version to 0.40.0 and I actaully got the RetryError exception propagated this time.

I think the way you reproduced the error was without calling the future.result() but I believe making that call is a good enough way to propagate the exception?

The only concerns is, in our retry logic we reuse the same client object in case of failure to issue a new publish() call. Is the client going to try to reconnect or it will fail every time for now on? My guess is, it shouldn't fail because I can see in the logs that a message had to be retried but the retry queue never got full. it means that eventually the message was sent. We sort of want the thread not to die but to keep checking and retry-ing to send messages.

@plamut
Copy link
Contributor

plamut commented May 2, 2019

@jam182 Glad to hear that updating to 0.40.0 works now! 👍 (that was the version I tested with).

I think the way you reproduced the error was without calling the future.result() but I believe making that call is a good enough way to propagate the exception?

Indeed, as the only way I managed to reproduce the bug was to not call future.result() anywhere. I basically just wanted to check that the publisher client was not used incorrectly, e.g. like below:

try
    client.publish(...)
except Exception as exc:
    # does not work this way...

The only concerns is, in our retry logic we reuse the same client object in case of failure to issue a new publish() call. Is the client going to try to reconnect or it will fail every time for now on?

If repeating the call with the same client instance, the same underlying channel will be used (the channel does not get recreated on additional publish() calls). It is thus likely that the error will persist, and the client will enter another retry cycle - and eventually succeed, or end up raising another RetryError after a timeout.

FWIW, a RetryError indicates that the underlying error was transient and was expected to eventually sort itself out, but retrying took too long. Re-issuing a publish request with the came client would therefore mean "retrying the retry", and would have a chance to succeed eventually IMO.

On the other hand, if a non-retryable error occurs, the underlying machinery will not attempt to retry the request, and a different exception will be propagated up to the custom script. "Manually" retrying on such errors is probably futile.

@plamut
Copy link
Contributor

plamut commented May 3, 2019

Closing this, as it has been resolved by upgrading pubsub to the latest version.

@jam182 Thank you for the effort on your side, too, and for providing all the details. Should the issue re-emerge, however, feel free to come back and re-open it. Thanks again!

@plamut plamut closed this as completed May 3, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

4 participants