diff --git a/kafka/producer/future.py b/kafka/producer/future.py index f67db0979..07fa4adb4 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -38,7 +38,7 @@ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, seri produce_future.add_errback(self.failure) def _produce_success(self, offset_and_timestamp): - offset, produce_timestamp_ms = offset_and_timestamp + offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp # Unpacking from args tuple is minor speed optimization (relative_offset, timestamp_ms, checksum, @@ -51,7 +51,7 @@ def _produce_success(self, offset_and_timestamp): if offset != -1 and relative_offset is not None: offset += relative_offset tp = self._produce_future.topic_partition - metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, + metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset, checksum, serialized_key_size, serialized_value_size, serialized_header_size) self.success(metadata) @@ -67,5 +67,5 @@ def get(self, timeout=None): RecordMetadata = collections.namedtuple( - 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', + 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset', 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size']) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 0de5f98e7..a2aa0e8ec 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -68,16 +68,16 @@ def try_append(self, timestamp_ms, key, value, headers): sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) return future - def done(self, base_offset=None, timestamp_ms=None, exception=None): + def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None): level = logging.DEBUG if exception is None else logging.WARNING log.log(level, "Produced messages to topic-partition %s with base offset" - " %s and error %s.", self.topic_partition, base_offset, - exception) # trace + " %s log start offset %s and error %s.", self.topic_partition, base_offset, + log_start_offset, global_error) # trace if self.produce_future.is_done: log.warning('Batch is already closed -- ignoring batch.done()') return elif exception is None: - self.produce_future.success((base_offset, timestamp_ms)) + self.produce_future.success((base_offset, timestamp_ms, log_start_offset)) else: self.produce_future.failure(exception) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 705b58f9a..35688d3f1 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -195,15 +195,22 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for topic, partitions in response.topics: for partition_info in partitions: + global_error = None + log_start_offset = None if response.API_VERSION < 2: partition, error_code, offset = partition_info ts = None - else: + elif 2 <= response.API_VERSION <= 4: partition, error_code, offset, ts = partition_info + elif 5 <= response.API_VERSION <= 7: + partition, error_code, offset, ts, log_start_offset = partition_info + else: + # the ignored parameter is record_error of type list[(batch_index: int, error_message: str)] + partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info tp = TopicPartition(topic, partition) error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset, ts) + self._complete_batch(batch, error, offset, ts, log_start_offset, global_error) if response.API_VERSION > 0: self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) @@ -213,7 +220,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for batch in batches: self._complete_batch(batch, None, -1, None) - def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None): """Complete or retry the given batch of records. Arguments: @@ -221,6 +228,8 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful timestamp_ms (int, optional): The timestamp returned by the broker for this batch + log_start_offset (int): The start offset of the log at the time this produce response was created + global_error (str): The summarising error message """ # Standardize no-error to None if error is Errors.NoError: @@ -232,7 +241,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): " retrying (%d attempts left). Error: %s", batch.topic_partition, self.config['retries'] - batch.attempts - 1, - error) + global_error or error) self._accumulator.reenqueue(batch) self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: @@ -240,7 +249,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, timestamp_ms, error) + batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error) self._accumulator.deallocate(batch) if error is not None: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) @@ -293,7 +302,15 @@ def _produce_request(self, node_id, acks, timeout, batches): produce_records_by_partition[topic][partition] = buf kwargs = {} - if self.config['api_version'] >= (0, 11): + if self.config['api_version'] >= (2, 1): + version = 7 + elif self.config['api_version'] >= (2, 0): + version = 6 + elif self.config['api_version'] >= (1, 1): + version = 5 + elif self.config['api_version'] >= (1, 0): + version = 4 + elif self.config['api_version'] >= (0, 11): version = 3 kwargs = dict(transactional_id=None) elif self.config['api_version'] >= (0, 10): diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index f4032b311..9b3f6bf55 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -78,6 +78,50 @@ class ProduceResponse_v5(Response): ) +class ProduceResponse_v6(Response): + """ + The version number is bumped to indicate that on quota violation brokers send out responses before throttling. + """ + API_KEY = 0 + API_VERSION = 6 + SCHEMA = ProduceResponse_v5.SCHEMA + + +class ProduceResponse_v7(Response): + """ + V7 bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_KEY = 0 + API_VERSION = 7 + SCHEMA = ProduceResponse_v6.SCHEMA + + +class ProduceResponse_v8(Response): + """ + V8 bumped up to add two new fields record_errors offset list and error_message + (See KIP-467) + """ + API_KEY = 0 + API_VERSION = 8 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64), + ('timestamp', Int64), + ('log_start_offset', Int64)), + ('record_errors', (Array( + ('batch_index', Int32), + ('batch_index_error_message', String('utf-8')) + ))), + ('error_message', String('utf-8')) + ))), + ('throttle_time_ms', Int32) + ) + + class ProduceRequest(Request): API_KEY = 0 @@ -106,6 +150,7 @@ class ProduceRequest_v1(ProduceRequest): RESPONSE_TYPE = ProduceResponse_v1 SCHEMA = ProduceRequest_v0.SCHEMA + class ProduceRequest_v2(ProduceRequest): API_VERSION = 2 RESPONSE_TYPE = ProduceResponse_v2 @@ -147,11 +192,41 @@ class ProduceRequest_v5(ProduceRequest): SCHEMA = ProduceRequest_v4.SCHEMA +class ProduceRequest_v6(ProduceRequest): + """ + The version number is bumped to indicate that on quota violation brokers send out responses before throttling. + """ + API_VERSION = 6 + RESPONSE_TYPE = ProduceResponse_v6 + SCHEMA = ProduceRequest_v5.SCHEMA + + +class ProduceRequest_v7(ProduceRequest): + """ + V7 bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_VERSION = 7 + RESPONSE_TYPE = ProduceResponse_v7 + SCHEMA = ProduceRequest_v6.SCHEMA + + +class ProduceRequest_v8(ProduceRequest): + """ + V8 bumped up to add two new fields record_errors offset list and error_message to PartitionResponse + (See KIP-467) + """ + API_VERSION = 8 + RESPONSE_TYPE = ProduceResponse_v8 + SCHEMA = ProduceRequest_v7.SCHEMA + + ProduceRequest = [ ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2, - ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5 + ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5, + ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8, ] ProduceResponse = [ ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2, - ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5 + ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5, + ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8, ]