Skip to content

Commit

Permalink
Use a responder function rather than a metrics library.
Browse files Browse the repository at this point in the history
  • Loading branch information
ucarion committed Jul 10, 2015
1 parent 0460cdd commit 92dbc52
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 31 deletions.
9 changes: 4 additions & 5 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ def wrapper(self, *args, **kwargs):
start_time = time.time()
ret = fn(self, *args, **kwargs)

if self.metrics:
metric = getattr(self.metrics, metric_name)
metric.addValue(time.time() - start_time)
if self.metrics_responder:
self.metrics_responder(metric_name, time.time() - start_time)

return ret
return wrapper
Expand All @@ -45,13 +44,13 @@ class KafkaClient(object):
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0,
metrics=None):
metrics_responder=None):
# We need one connection to bootstrap
self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
self.metrics = metrics
self.metrics_responder = metrics_responder

# create connections only when we need them
self.conns = {}
Expand Down
37 changes: 11 additions & 26 deletions kafka/consumer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
'auto_commit_interval_messages': None,
'consumer_timeout_ms': -1,

'enable_metrics': False,
'metrics_responder': None,

# Currently unused
'socket_receive_buffer_bytes': 64 * 1024,
Expand Down Expand Up @@ -137,28 +137,13 @@ def configure(self, **configs):
'bootstrap_servers required to configure KafkaConsumer'
)

if self._config['enable_metrics']:
from greplin import scales
self.metrics = scales.collection('kafka',
scales.PmfStat('metadata_request_timer'),
scales.PmfStat('produce_request_timer'),
scales.PmfStat('fetch_request_timer'),
scales.PmfStat('offset_request_timer'),
scales.PmfStat('offset_commit_request_timer'),
scales.PmfStat('offset_fetch_request_timer'),

scales.IntStat('failed_payloads_count'),
scales.IntStat('offset_out_of_range_count'),
scales.IntStat('not_leader_for_partition_count'),
scales.IntStat('request_timed_out_count'))
else:
self.metrics = None
self.metrics_responder = self._config['metrics_responder']

self._client = KafkaClient(
self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0),
metrics=self.metrics
metrics_responder=self.metrics_responder
)

def set_topic_partitions(self, *topics):
Expand Down Expand Up @@ -364,8 +349,8 @@ def fetch_messages(self):
for resp in responses:

if isinstance(resp, FailedPayloadsError):
if self.metrics:
self.metrics.failed_payloads_count += 1
if self.metrics_responder:
self.metrics_responder('failed_payloads_count', 1)

logger.warning('FailedPayloadsError attempting to fetch data')
self._refresh_metadata_on_error()
Expand All @@ -376,8 +361,8 @@ def fetch_messages(self):
try:
check_error(resp)
except OffsetOutOfRangeError:
if self.metrics:
self.metrics.offset_out_of_range_count += 1
if self.metrics_responder:
self.metrics_responder('offset_out_of_range_count', 1)

logger.warning('OffsetOutOfRange: topic %s, partition %d, '
'offset %d (Highwatermark: %d)',
Expand All @@ -391,8 +376,8 @@ def fetch_messages(self):
continue

except NotLeaderForPartitionError:
if self.metrics:
self.metrics.not_leader_for_partition_count += 1
if self.metrics_responder:
self.metrics_responder('not_leader_for_partition_count', 1)

logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
Expand All @@ -401,8 +386,8 @@ def fetch_messages(self):
continue

except RequestTimedOutError:
if self.metrics:
self.metrics.request_timed_out_count += 1
if self.metrics_responder:
self.metrics_responder('request_timed_out_count', 1)

logger.warning("RequestTimedOutError for %s - %d",
topic, partition)
Expand Down

0 comments on commit 92dbc52

Please sign in to comment.