diff --git a/kafka/client.py b/kafka/client.py index d8d77e457..1ac0aae58 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -19,6 +19,21 @@ log = logging.getLogger(__name__) +def time_metric(metric_name): + def decorator(fn): + @functools.wraps(fn) + def wrapper(self, *args, **kwargs): + start_time = time.time() + ret = fn(self, *args, **kwargs) + + if self.metrics_responder: + self.metrics_responder(metric_name, time.time() - start_time) + + return ret + return wrapper + return decorator + + class KafkaClient(object): CLIENT_ID = b'kafka-python' @@ -28,12 +43,14 @@ class KafkaClient(object): # socket timeout. def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, - correlation_id=0): + correlation_id=0, + metrics_responder=None): # We need one connection to bootstrap self.client_id = kafka_bytestring(client_id) self.timeout = timeout self.hosts = collect_hosts(hosts) self.correlation_id = correlation_id + self.metrics_responder = metrics_responder # create connections only when we need them self.conns = {} @@ -48,6 +65,7 @@ def __init__(self, hosts, client_id=CLIENT_ID, # Private API # ################## + def _get_conn(self, host, port): """Get or create a connection to a broker using host and port""" host_key = (host, port) @@ -422,6 +440,7 @@ def load_metadata_for_topics(self, *topics): leader, None, None ) + @time_metric('metadata_request_timer') def send_metadata_request(self, payloads=[], fail_on_error=True, callback=None): encoder = KafkaProtocol.encode_metadata_request @@ -429,6 +448,7 @@ def send_metadata_request(self, payloads=[], fail_on_error=True, return self._send_broker_unaware_request(payloads, encoder, decoder) + @time_metric('produce_request_timer') def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -479,6 +499,7 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, if resp is not None and (not fail_on_error or not self._raise_on_response_error(resp))] + @time_metric('fetch_request_timer') def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None, max_wait_time=100, min_bytes=4096): """ @@ -499,6 +520,7 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, return [resp if not callback else callback(resp) for resp in resps if not fail_on_error or not self._raise_on_response_error(resp)] + @time_metric('offset_request_timer') def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): resps = self._send_broker_aware_request( @@ -509,6 +531,7 @@ def send_offset_request(self, payloads=[], fail_on_error=True, return [resp if not callback else callback(resp) for resp in resps if not fail_on_error or not self._raise_on_response_error(resp)] + @time_metric('offset_commit_request_timer') def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, @@ -519,6 +542,7 @@ def send_offset_commit_request(self, group, payloads=[], return [resp if not callback else callback(resp) for resp in resps if not fail_on_error or not self._raise_on_response_error(resp)] + @time_metric('offset_fetch_request_timer') def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index efe910528..6289d1bd2 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -38,6 +38,8 @@ 'auto_commit_interval_messages': None, 'consumer_timeout_ms': -1, + 'metrics_responder': None, + # Currently unused 'socket_receive_buffer_bytes': 64 * 1024, 'num_consumer_fetchers': 1, @@ -135,10 +137,13 @@ def configure(self, **configs): 'bootstrap_servers required to configure KafkaConsumer' ) + self.metrics_responder = self._config['metrics_responder'] + self._client = KafkaClient( self._config['bootstrap_servers'], client_id=self._config['client_id'], - timeout=(self._config['socket_timeout_ms'] / 1000.0) + timeout=(self._config['socket_timeout_ms'] / 1000.0), + metrics_responder=self.metrics_responder ) def set_topic_partitions(self, *topics): @@ -348,6 +353,9 @@ def fetch_messages(self): for resp in responses: if isinstance(resp, FailedPayloadsError): + if self.metrics_responder: + self.metrics_responder('failed_payloads_count', 1) + logger.warning('FailedPayloadsError attempting to fetch data') self._refresh_metadata_on_error() continue @@ -357,6 +365,9 @@ def fetch_messages(self): try: check_error(resp) except OffsetOutOfRangeError: + if self.metrics_responder: + self.metrics_responder('offset_out_of_range_count', 1) + logger.warning('OffsetOutOfRange: topic %s, partition %d, ' 'offset %d (Highwatermark: %d)', topic, partition, @@ -369,6 +380,9 @@ def fetch_messages(self): continue except NotLeaderForPartitionError: + if self.metrics_responder: + self.metrics_responder('not_leader_for_partition_count', 1) + logger.warning("NotLeaderForPartitionError for %s - %d. " "Metadata may be out of date", topic, partition) @@ -376,6 +390,9 @@ def fetch_messages(self): continue except RequestTimedOutError: + if self.metrics_responder: + self.metrics_responder('request_timed_out_count', 1) + logger.warning("RequestTimedOutError for %s - %d", topic, partition) continue