Skip to content

Commit

Permalink
Pub/Sub logging hygiene. (#4471)
Browse files Browse the repository at this point in the history
Uses newline character when logging protobufs that may end up having
newlines in them. These are harder to grok in the log output since
it "blends" the protobuf and the description. For example.

    Received response: received_messages {
      ack_id: "Pn41MEV..."
      message {
        data: ...

vs.

    Received response:
    received_messages {
      ack_id: "Pn41MEV..."
      message {
        data: ...

Also

- replaced a lambda with a simple function
- added docstrings to two default callbacks used by the
  `policy.thread.Policy` implementation.
- updated grammatical error in a comment
- replaced a bare `if future` with `if future is not None`
- using repr() of protobufs (rather than str())
  • Loading branch information
dhermes authored Nov 28, 2017
1 parent fd5b817 commit 56fc884
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
8 changes: 3 additions & 5 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ def _request_generator_thread(self):
# First, yield the initial request. This occurs on every new
# connection, fundamentally including a resumed connection.
initial_request = self._policy.get_initial_request(ack_queue=True)
_LOGGER.debug('Sending initial request: {initial_request}'.format(
initial_request=initial_request,
))
_LOGGER.debug('Sending initial request:\n%r', initial_request)
yield initial_request

# Now yield each of the items on the request queue, and block if there
Expand All @@ -215,7 +213,7 @@ def _request_generator_thread(self):
_LOGGER.debug('Request generator signaled to stop.')
break

_LOGGER.debug('Sending request: {}'.format(request))
_LOGGER.debug('Sending request:\n%r', request)
yield request

def _blocking_consume(self):
Expand All @@ -233,7 +231,7 @@ def _blocking_consume(self):
response_generator = self._policy.call_rpc(request_generator)
try:
for response in response_generator:
_LOGGER.debug('Received response: {0}'.format(response))
_LOGGER.debug('Received response:\n%r', response)
self._policy.on_response(response)

# If the loop above exits without an exception, then the
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def maintain_leases(self):
in an appropriate form of subprocess.
"""
while True:
# Sanity check: Should this infinitely loop quit?
# Sanity check: Should this infinite loop quit?
if not self._consumer.active:
return

Expand Down
32 changes: 28 additions & 4 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,34 @@


def _callback_completed(future):
"""Simple callback that just logs a `Future`'s result."""
"""Simple callback that just logs a future's result.
Used on completion of processing a message received by a
subscriber.
Args:
future (concurrent.futures.Future): A future returned
from :meth:`~concurrent.futures.Executor.submit`.
"""
_LOGGER.debug('Result: %s', future.result())


def _do_nothing_callback(message):
"""Default callback for messages received by subscriber.
Does nothing with the message and returns :data:`None`.
Args:
message (~google.cloud.pubsub_v1.subscriber.message.Message): A
protobuf message returned by the backend and parsed into
our high level message type.
Returns:
NoneType: Always.
"""
return None


class Policy(base.BasePolicy):
"""A consumer class based on :class:`threading.Thread`.
Expand All @@ -62,7 +86,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
``executor``.
"""
# Default the callback to a no-op; it is provided by `.open`.
self._callback = lambda message: None
self._callback = _do_nothing_callback

# Default the future to None; it is provided by `.open`.
self._future = None
Expand Down Expand Up @@ -97,7 +121,7 @@ def close(self):

# The subscription is closing cleanly; resolve the future if it is not
# resolved already.
if self._future and not self._future.done():
if self._future is not None and not self._future.done():
self._future.set_result(None)
self._future = None

Expand Down Expand Up @@ -167,7 +191,7 @@ def on_response(self, response):
For each message, schedule a callback with the executor.
"""
for msg in response.received_messages:
_LOGGER.debug('New message received from Pub/Sub: %r', msg)
_LOGGER.debug('New message received from Pub/Sub:\n%r', msg)
_LOGGER.debug(self._callback)
message = Message(msg.message, msg.ack_id, self._request_queue)
future = self._executor.submit(self._callback, message)
Expand Down

0 comments on commit 56fc884

Please sign in to comment.