From a356394737c4235258691fe7f2c40f542bcd71e7 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Thu, 12 Mar 2020 16:25:01 +0200 Subject: [PATCH 1/7] Add zstd compression support to the kafka client --- requirements-dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-dev.txt b/requirements-dev.txt index d2830905b..77b8b5134 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,3 +15,4 @@ pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 py==1.8.0 +zstandard==0.13.0 \ No newline at end of file From 93a604dd37e9af22d1a224fd946c27f54450c466 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 00:06:47 +0100 Subject: [PATCH 2/7] Add all produce api required changes to a separate branch (basically all but the zstd related stuff) --- kafka/producer/future.py | 6 +-- kafka/producer/record_accumulator.py | 8 +-- kafka/producer/sender.py | 24 +++++++-- kafka/protocol/produce.py | 79 ++++++++++++++++++++++++++-- 4 files changed, 102 insertions(+), 15 deletions(-) diff --git a/kafka/producer/future.py b/kafka/producer/future.py index f67db0979..07fa4adb4 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -38,7 +38,7 @@ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, seri produce_future.add_errback(self.failure) def _produce_success(self, offset_and_timestamp): - offset, produce_timestamp_ms = offset_and_timestamp + offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp # Unpacking from args tuple is minor speed optimization (relative_offset, timestamp_ms, checksum, @@ -51,7 +51,7 @@ def _produce_success(self, offset_and_timestamp): if offset != -1 and relative_offset is not None: offset += relative_offset tp = self._produce_future.topic_partition - metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, + metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset, checksum, serialized_key_size, serialized_value_size, serialized_header_size) self.success(metadata) @@ -67,5 +67,5 @@ def get(self, timeout=None): RecordMetadata = collections.namedtuple( - 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', + 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset', 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size']) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 0de5f98e7..051ae5cfc 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -68,16 +68,16 @@ def try_append(self, timestamp_ms, key, value, headers): sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) return future - def done(self, base_offset=None, timestamp_ms=None, exception=None): + def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None): level = logging.DEBUG if exception is None else logging.WARNING log.log(level, "Produced messages to topic-partition %s with base offset" - " %s and error %s.", self.topic_partition, base_offset, - exception) # trace + " %s log start offset %s and error %s.", self.topic_partition, base_offset, + log_start_offset, exception) # trace if self.produce_future.is_done: log.warning('Batch is already closed -- ignoring batch.done()') return elif exception is None: - self.produce_future.success((base_offset, timestamp_ms)) + self.produce_future.success((base_offset, timestamp_ms, log_start_offset)) else: self.produce_future.failure(exception) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 705b58f9a..a95abd1ce 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -195,13 +195,18 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for topic, partitions in response.topics: for partition_info in partitions: + error_message = None if response.API_VERSION < 2: partition, error_code, offset = partition_info ts = None - else: + elif 2 <= response.API_VERSION <= 4: partition, error_code, offset, ts = partition_info + elif 5 <= response.API_VERSION <= 7: + partition, error_code, offset, ts, log_start_offset = partition_info + else: + partition, error_code, offset, ts, log_start_offset, _, error_message = partition_info tp = TopicPartition(topic, partition) - error = Errors.for_code(error_code) + error = error_message or Errors.for_code(error_code) batch = batches_by_partition[tp] self._complete_batch(batch, error, offset, ts) @@ -213,7 +218,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for batch in batches: self._complete_batch(batch, None, -1, None) - def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None): """Complete or retry the given batch of records. Arguments: @@ -221,6 +226,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful timestamp_ms (int, optional): The timestamp returned by the broker for this batch + log_start_offset (int): The start offset of the log at the time this produce response was created """ # Standardize no-error to None if error is Errors.NoError: @@ -240,7 +246,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, timestamp_ms, error) + batch.done(base_offset, timestamp_ms, error, log_start_offset) self._accumulator.deallocate(batch) if error is not None: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) @@ -293,7 +299,15 @@ def _produce_request(self, node_id, acks, timeout, batches): produce_records_by_partition[topic][partition] = buf kwargs = {} - if self.config['api_version'] >= (0, 11): + if self.config['api_version'] >= (2, 1): + version = 7 + elif self.config['api_version'] >= (2, 0): + version = 6 + elif self.config['api_version'] >= (1, 1): + version = 5 + elif self.config['api_version'] >= (1, 0): + version = 4 + elif self.config['api_version'] >= (0, 11): version = 3 kwargs = dict(transactional_id=None) elif self.config['api_version'] >= (0, 10): diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index f4032b311..a32a53975 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -61,7 +61,6 @@ class ProduceResponse_v4(Response): API_VERSION = 4 SCHEMA = ProduceResponse_v3.SCHEMA - class ProduceResponse_v5(Response): API_KEY = 0 API_VERSION = 5 @@ -78,6 +77,50 @@ class ProduceResponse_v5(Response): ) +class ProduceResponse_v6(Response): + """ + The version number is bumped to indicate that on quota violation brokers send out responses before throttling. + """ + API_KEY = 0 + API_VERSION = 6 + SCHEMA = ProduceResponse_v5.SCHEMA + + +class ProduceResponse_v7(Response): + """ + V7 bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_KEY = 0 + API_VERSION = 7 + SCHEMA = ProduceResponse_v6.SCHEMA + + +class ProduceResponse_v8(Response): + """ + V8 bumped up to add two new fields record_errors offset list and error_message + (See KIP-467) + """ + API_KEY = 0 + API_VERSION = 8 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64), + ('timestamp', Int64), + ('log_start_offset', Int64)), + ('record_errors', (Array( + ('batch_index', Int32), + ('batch_index_error_message', String('utf-8')) + ))), + ('error_message', String('utf-8')) + ))), + ('throttle_time_ms', Int32) + ) + + class ProduceRequest(Request): API_KEY = 0 @@ -147,11 +190,41 @@ class ProduceRequest_v5(ProduceRequest): SCHEMA = ProduceRequest_v4.SCHEMA +class ProduceRequest_v6(ProduceRequest): + """ + The version number is bumped to indicate that on quota violation brokers send out responses before throttling. + """ + API_VERSION = 6 + RESPONSE_TYPE = ProduceResponse_v6 + SCHEMA = ProduceRequest_v5.SCHEMA + + +class ProduceRequest_v7(ProduceRequest): + """ + V7 bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_VERSION = 7 + RESPONSE_TYPE = ProduceResponse_v7 + SCHEMA = ProduceRequest_v6.SCHEMA + + +class ProduceRequest_v8(ProduceRequest): + """ + V8 bumped up to add two new fields record_errors offset list and error_message to PartitionResponse + (See KIP-467) + """ + API_VERSION = 8 + RESPONSE_TYPE = ProduceResponse_v8 + SCHEMA = ProduceRequest_v7.SCHEMA + + ProduceRequest = [ ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2, - ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5 + ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5, + ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8, ] ProduceResponse = [ ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2, - ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5 + ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5, + ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8, ] From 63b4ef853c21e0a3d2def88aff5b1374db7417d0 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 18:36:34 +0100 Subject: [PATCH 3/7] Newline between class defs, revert requirements file --- kafka/protocol/produce.py | 2 ++ requirements-dev.txt | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index a32a53975..9b3f6bf55 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -61,6 +61,7 @@ class ProduceResponse_v4(Response): API_VERSION = 4 SCHEMA = ProduceResponse_v3.SCHEMA + class ProduceResponse_v5(Response): API_KEY = 0 API_VERSION = 5 @@ -149,6 +150,7 @@ class ProduceRequest_v1(ProduceRequest): RESPONSE_TYPE = ProduceResponse_v1 SCHEMA = ProduceRequest_v0.SCHEMA + class ProduceRequest_v2(ProduceRequest): API_VERSION = 2 RESPONSE_TYPE = ProduceResponse_v2 diff --git a/requirements-dev.txt b/requirements-dev.txt index 77b8b5134..d2830905b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,4 +15,3 @@ pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 py==1.8.0 -zstandard==0.13.0 \ No newline at end of file From 016b8c92b5181104e0cd3f9e11774c749c31f567 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Thu, 19 Mar 2020 20:09:53 +0100 Subject: [PATCH 4/7] Use old error api --- kafka/producer/record_accumulator.py | 4 ++-- kafka/producer/sender.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 051ae5cfc..a2aa0e8ec 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -68,11 +68,11 @@ def try_append(self, timestamp_ms, key, value, headers): sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) return future - def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None): + def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None): level = logging.DEBUG if exception is None else logging.WARNING log.log(level, "Produced messages to topic-partition %s with base offset" " %s log start offset %s and error %s.", self.topic_partition, base_offset, - log_start_offset, exception) # trace + log_start_offset, global_error) # trace if self.produce_future.is_done: log.warning('Batch is already closed -- ignoring batch.done()') return diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index a95abd1ce..79c7d154c 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -195,7 +195,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for topic, partitions in response.topics: for partition_info in partitions: - error_message = None + global_error = None if response.API_VERSION < 2: partition, error_code, offset = partition_info ts = None @@ -204,11 +204,11 @@ def _handle_produce_response(self, node_id, send_time, batches, response): elif 5 <= response.API_VERSION <= 7: partition, error_code, offset, ts, log_start_offset = partition_info else: - partition, error_code, offset, ts, log_start_offset, _, error_message = partition_info + partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info tp = TopicPartition(topic, partition) - error = error_message or Errors.for_code(error_code) + error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset, ts) + self._complete_batch(batch, error, offset, ts, global_error) if response.API_VERSION > 0: self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) @@ -218,7 +218,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for batch in batches: self._complete_batch(batch, None, -1, None) - def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None): """Complete or retry the given batch of records. Arguments: @@ -238,7 +238,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star " retrying (%d attempts left). Error: %s", batch.topic_partition, self.config['retries'] - batch.attempts - 1, - error) + global_error or error) self._accumulator.reenqueue(batch) self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: @@ -246,7 +246,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, timestamp_ms, error, log_start_offset) + batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error) self._accumulator.deallocate(batch) if error is not None: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) From aaf61ff89a6476196b50d7549527e82294474696 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Thu, 19 Mar 2020 20:19:04 +0100 Subject: [PATCH 5/7] Update docstring --- kafka/producer/sender.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 79c7d154c..31e2609ab 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -196,6 +196,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for topic, partitions in response.topics: for partition_info in partitions: global_error = None + log_start_offset = None if response.API_VERSION < 2: partition, error_code, offset = partition_info ts = None @@ -208,7 +209,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): tp = TopicPartition(topic, partition) error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset, ts, global_error) + self._complete_batch(batch, error, offset, ts, log_start_offset, global_error) if response.API_VERSION > 0: self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) @@ -227,6 +228,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star base_offset (int): The base offset assigned to the records if successful timestamp_ms (int, optional): The timestamp returned by the broker for this batch log_start_offset (int): The start offset of the log at the time this produce response was created + global_error (Exception): The summarising error message """ # Standardize no-error to None if error is Errors.NoError: From c03d24e970d0f060e048a15e1492a33c602b4ab6 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Sat, 21 Mar 2020 00:30:06 +0100 Subject: [PATCH 6/7] Update docstring with proper error type --- kafka/producer/sender.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 31e2609ab..872013706 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -228,7 +228,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star base_offset (int): The base offset assigned to the records if successful timestamp_ms (int, optional): The timestamp returned by the broker for this batch log_start_offset (int): The start offset of the log at the time this produce response was created - global_error (Exception): The summarising error message + global_error (str): The summarising error message """ # Standardize no-error to None if error is Errors.NoError: From b62aedd83775aaf45ee656bec56eabea70268ff2 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Wed, 25 Mar 2020 16:02:51 +0100 Subject: [PATCH 7/7] Add comment indicating type ignored unpacked value from produce response for future reference --- kafka/producer/sender.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 872013706..35688d3f1 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -205,6 +205,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): elif 5 <= response.API_VERSION <= 7: partition, error_code, offset, ts, log_start_offset = partition_info else: + # the ignored parameter is record_error of type list[(batch_index: int, error_message: str)] partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info tp = TopicPartition(topic, partition) error = Errors.for_code(error_code)