Skip to content

Commit

Permalink
Overhaul logging background thread transport (#3407)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Wayne Parrott authored May 12, 2017
1 parent bda4dc0 commit a623929
Show file tree
Hide file tree
Showing 4 changed files with 401 additions and 181 deletions.
6 changes: 5 additions & 1 deletion logging/google/cloud/logging/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

DEFAULT_LOGGER_NAME = 'python'

EXCLUDED_LOGGER_DEFAULTS = ('google.cloud', 'oauth2client')
EXCLUDED_LOGGER_DEFAULTS = (
'google.cloud',
'google.auth',
'google_auth_httplib2',
)


class CloudLoggingHandler(logging.StreamHandler):
Expand Down
274 changes: 183 additions & 91 deletions logging/google/cloud/logging/handlers/transports/background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,139 +17,231 @@
Uses a background worker to log to Stackdriver Logging asynchronously.
"""

from __future__ import print_function

import atexit
import copy
import logging
import threading

from six.moves import range
from six.moves import queue

from google.cloud.logging.handlers.transports.base import Transport

_WORKER_THREAD_NAME = 'google.cloud.logging.handlers.transport.Worker'
_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
_DEFAULT_MAX_BATCH_SIZE = 10
_WORKER_THREAD_NAME = 'google.cloud.logging.Worker'
_WORKER_TERMINATOR = object()
_LOGGER = logging.getLogger(__name__)


class _Worker(object):
"""A threaded worker that writes batches of log entries
def _get_many(queue_, max_items=None):
"""Get multiple items from a Queue.
Writes entries to the logger API.
Gets at least one (blocking) and at most ``max_items`` items
(non-blocking) from a given Queue. Does not mark the items as done.
This class reuses a single :class:`Batch` method to write successive
entries.
:type queue_: :class:`~queue.Queue`
:param queue_: The Queue to get items from.
Currently, the only public methods are constructing it (which also starts
it) and enqueuing :class:`Logger` (record, message) pairs.
:type max_items: int
:param max_items: The maximum number of items to get. If ``None``, then all
available items in the queue are returned.
:rtype: Sequence
:returns: A sequence of items retrieved from the queue.
"""
# Always return at least one item.
items = [queue_.get()]
while max_items is None or len(items) < max_items:
try:
items.append(queue_.get_nowait())
except queue.Empty:
break
return items

def __init__(self, logger):
self.started = False
self.stopping = False
self.stopped = False

# _entries_condition is used to signal from the main thread whether
# there are any waiting queued logger entries to be written
self._entries_condition = threading.Condition()
class _Worker(object):
"""A background thread that writes batches of log entries.
# _stop_condition is used to signal from the worker thread to the
# main thread that it's finished its last entries
self._stop_condition = threading.Condition()
:type cloud_logger: :class:`~google.cloud.logging.logger.Logger`
:param cloud_logger: The logger to send entries to.
# This object continually reuses the same :class:`Batch` object to
# write multiple entries at the same time.
self.logger = logger
self.batch = self.logger.batch()
:type grace_period: float
:param grace_period: The amount of time to wait for pending logs to
be submitted when the process is shutting down.
:type max_batch_size: int
:param max_batch_size: The maximum number of items to send at a time
in the background thread.
"""

def __init__(self, cloud_logger, grace_period=_DEFAULT_GRACE_PERIOD,
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
self._cloud_logger = cloud_logger
self._grace_period = grace_period
self._max_batch_size = max_batch_size
self._queue = queue.Queue(0)
self._operational_lock = threading.Lock()
self._thread = None

# Number in seconds of how long to wait for worker to send remaining
self._stop_timeout = 5
@property
def is_alive(self):
"""Returns True is the background thread is running."""
return self._thread is not None and self._thread.is_alive()

self._start()
def _safely_commit_batch(self, batch):
total_logs = len(batch.entries)

def _run(self):
try:
if total_logs > 0:
batch.commit()
_LOGGER.debug('Submitted %d logs', total_logs)
except Exception:
_LOGGER.error(
'Failed to submit %d logs.', total_logs, exc_info=True)

def _thread_main(self):
"""The entry point for the worker thread.
Loops until ``stopping`` is set to :data:`True`, and commits batch
entries written during :meth:`enqueue`.
Pulls pending log entries off the queue and writes them in batches to
the Cloud Logger.
"""
try:
self._entries_condition.acquire()
self.started = True
while not self.stopping:
if len(self.batch.entries) == 0:
# branch coverage of this code extremely flaky
self._entries_condition.wait() # pragma: NO COVER

if len(self.batch.entries) > 0:
self.batch.commit()
finally:
self._entries_condition.release()

# main thread may be waiting for worker thread to finish writing its
# final entries. here we signal that it's done.
self._stop_condition.acquire()
self._stop_condition.notify()
self._stop_condition.release()

def _start(self):
"""Called by this class's constructor
This method is responsible for starting the thread and registering
the exit handlers.
_LOGGER.debug('Background thread started.')

quit_ = False
while True:
batch = self._cloud_logger.batch()
items = _get_many(self._queue, max_items=self._max_batch_size)

for item in items:
if item is _WORKER_TERMINATOR:
quit_ = True
# Continue processing items, don't break, try to process
# all items we got back before quitting.
else:
batch.log_struct(**item)

self._safely_commit_batch(batch)

for _ in range(len(items)):
self._queue.task_done()

if quit_:
break

_LOGGER.debug('Background thread exited gracefully.')

def start(self):
"""Starts the background thread.
Additionally, this registers a handler for process exit to attempt
to send any pending log entries before shutdown.
"""
try:
self._entries_condition.acquire()
with self._operational_lock:
if self.is_alive:
return

self._thread = threading.Thread(
target=self._run, name=_WORKER_THREAD_NAME)
self._thread.setDaemon(True)
target=self._thread_main,
name=_WORKER_THREAD_NAME)
self._thread.daemon = True
self._thread.start()
finally:
self._entries_condition.release()
atexit.register(self._stop)
atexit.register(self._main_thread_terminated)

def stop(self, grace_period=None):
"""Signals the background thread to stop.
def _stop(self):
"""Signals the worker thread to shut down
This does not terminate the background thread. It simply queues the
stop signal. If the main process exits before the background thread
processes the stop signal, it will be terminated without finishing
work. The ``grace_period`` parameter will give the background
thread some time to finish processing before this function returns.
Also waits for ``stop_timeout`` seconds for the worker to finish.
:type grace_period: float
:param grace_period: If specified, this method will block up to this
many seconds to allow the background thread to finish work before
returning.
This method is called by the ``atexit`` handler registered by
:meth:`start`.
:rtype: bool
:returns: True if the thread terminated. False if the thread is still
running.
"""
if not self.started or self.stopping:
return
if not self.is_alive:
return True

with self._operational_lock:
self._queue.put_nowait(_WORKER_TERMINATOR)

if grace_period is not None:
print('Waiting up to %d seconds.' % (grace_period,))

# lock the stop condition first so that the worker
# thread can't notify it's finished before we wait
self._stop_condition.acquire()
self._thread.join(timeout=grace_period)

# now notify the worker thread to shutdown
self._entries_condition.acquire()
self.stopping = True
self._entries_condition.notify()
self._entries_condition.release()
# Check this before disowning the thread, because after we disown
# the thread is_alive will be False regardless of if the thread
# exited or not.
success = not self.is_alive

# now wait for it to signal it's finished
self._stop_condition.wait(self._stop_timeout)
self._stop_condition.release()
self.stopped = True
self._thread = None

return success

def _main_thread_terminated(self):
"""Callback that attempts to send pending logs before termination."""
if not self.is_alive:
return

if not self._queue.empty():
print(
'Program shutting down, attempting to send %d queued log '
'entries to Stackdriver Logging...' % (self._queue.qsize(),))

if self.stop(self._grace_period):
print('Sent all pending logs.')
else:
print('Failed to send %d pending logs.' % (self._queue.qsize(),))

def enqueue(self, record, message):
"""Queues up a log entry to be written by the background thread."""
try:
self._entries_condition.acquire()
if self.stopping:
return
info = {'message': message, 'python_logger': record.name}
self.batch.log_struct(info, severity=record.levelname)
self._entries_condition.notify()
finally:
self._entries_condition.release()
"""Queues a log entry to be written by the background thread.
:type record: :class:`logging.LogRecord`
:param record: Python log record that the handler was called with.
:type message: str
:param message: The message from the ``LogRecord`` after being
formatted by the associated log formatters.
"""
self._queue.put_nowait({
'info': {
'message': message,
'python_logger': record.name,
},
'severity': record.levelname,
})


class BackgroundThreadTransport(Transport):
"""Aysnchronous transport that uses a background thread.
"""Asynchronous transport that uses a background thread.
:type client: :class:`~google.cloud.logging.client.Client`
:param client: The Logging client.
:type name: str
:param name: the name of the logger.
:type grace_period: float
:param grace_period: The amount of time to wait for pending logs to
be submitted when the process is shutting down.
Writes logging entries as a batch process.
:type batch_size: int
:param batch_size: The maximum number of items to send at a time in the
background thread.
"""

def __init__(self, client, name):
def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD,
batch_size=_DEFAULT_MAX_BATCH_SIZE):
http = copy.deepcopy(client._http)
self.client = client.__class__(
client.project, client._credentials, http)
Expand Down
4 changes: 2 additions & 2 deletions logging/nox.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def unit_tests(session, python_version):
'py.test', '--quiet',
'--cov=google.cloud.logging', '--cov=tests.unit', '--cov-append',
'--cov-config=.coveragerc', '--cov-report=', '--cov-fail-under=97',
'tests/unit',
'tests/unit', *session.posargs
)


Expand All @@ -63,7 +63,7 @@ def system_tests(session, python_version):
session.install('.')

# Run py.test against the system tests.
session.run('py.test', '-vvv', 'tests/system.py')
session.run('py.test', '-vvv', 'tests/system.py', *session.posargs)


@nox.session
Expand Down
Loading

0 comments on commit a623929

Please sign in to comment.