Skip to content

Commit

Permalink
Merge pull request dpkp#1 from ucarion/ucarion-add-metrics
Browse files Browse the repository at this point in the history
Add metrics to KafkaClient.
  • Loading branch information
ecanzonieri committed Jul 28, 2015
2 parents 2ec70e1 + 92dbc52 commit 3dc6be9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
26 changes: 25 additions & 1 deletion kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@
log = logging.getLogger(__name__)


def time_metric(metric_name):
def decorator(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
start_time = time.time()
ret = fn(self, *args, **kwargs)

if self.metrics_responder:
self.metrics_responder(metric_name, time.time() - start_time)

return ret
return wrapper
return decorator


class KafkaClient(object):

CLIENT_ID = b'kafka-python'
Expand All @@ -28,12 +43,14 @@ class KafkaClient(object):
# socket timeout.
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
correlation_id=0,
metrics_responder=None):
# We need one connection to bootstrap
self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
self.metrics_responder = metrics_responder

# create connections only when we need them
self.conns = {}
Expand All @@ -48,6 +65,7 @@ def __init__(self, hosts, client_id=CLIENT_ID,
# Private API #
##################


def _get_conn(self, host, port):
"""Get or create a connection to a broker using host and port"""
host_key = (host, port)
Expand Down Expand Up @@ -422,13 +440,15 @@ def load_metadata_for_topics(self, *topics):
leader, None, None
)

@time_metric('metadata_request_timer')
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response

return self._send_broker_unaware_request(payloads, encoder, decoder)

@time_metric('produce_request_timer')
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
Expand Down Expand Up @@ -479,6 +499,7 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
if resp is not None and
(not fail_on_error or not self._raise_on_response_error(resp))]

@time_metric('fetch_request_timer')
def send_fetch_request(self, payloads=[], fail_on_error=True,
callback=None, max_wait_time=100, min_bytes=4096):
"""
Expand All @@ -499,6 +520,7 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

@time_metric('offset_request_timer')
def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None):
resps = self._send_broker_aware_request(
Expand All @@ -509,6 +531,7 @@ def send_offset_request(self, payloads=[], fail_on_error=True,
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

@time_metric('offset_commit_request_timer')
def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
Expand All @@ -519,6 +542,7 @@ def send_offset_commit_request(self, group, payloads=[],
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

@time_metric('offset_fetch_request_timer')
def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):

Expand Down
19 changes: 18 additions & 1 deletion kafka/consumer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
'auto_commit_interval_messages': None,
'consumer_timeout_ms': -1,

'metrics_responder': None,

# Currently unused
'socket_receive_buffer_bytes': 64 * 1024,
'num_consumer_fetchers': 1,
Expand Down Expand Up @@ -135,10 +137,13 @@ def configure(self, **configs):
'bootstrap_servers required to configure KafkaConsumer'
)

self.metrics_responder = self._config['metrics_responder']

self._client = KafkaClient(
self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0)
timeout=(self._config['socket_timeout_ms'] / 1000.0),
metrics_responder=self.metrics_responder
)

def set_topic_partitions(self, *topics):
Expand Down Expand Up @@ -348,6 +353,9 @@ def fetch_messages(self):
for resp in responses:

if isinstance(resp, FailedPayloadsError):
if self.metrics_responder:
self.metrics_responder('failed_payloads_count', 1)

logger.warning('FailedPayloadsError attempting to fetch data')
self._refresh_metadata_on_error()
continue
Expand All @@ -357,6 +365,9 @@ def fetch_messages(self):
try:
check_error(resp)
except OffsetOutOfRangeError:
if self.metrics_responder:
self.metrics_responder('offset_out_of_range_count', 1)

logger.warning('OffsetOutOfRange: topic %s, partition %d, '
'offset %d (Highwatermark: %d)',
topic, partition,
Expand All @@ -369,13 +380,19 @@ def fetch_messages(self):
continue

except NotLeaderForPartitionError:
if self.metrics_responder:
self.metrics_responder('not_leader_for_partition_count', 1)

logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
topic, partition)
self._refresh_metadata_on_error()
continue

except RequestTimedOutError:
if self.metrics_responder:
self.metrics_responder('request_timed_out_count', 1)

logger.warning("RequestTimedOutError for %s - %d",
topic, partition)
continue
Expand Down

0 comments on commit 3dc6be9

Please sign in to comment.