diff --git a/changelog.md b/changelog.md index 71c30270a..4bccd59fe 100644 --- a/changelog.md +++ b/changelog.md @@ -12,6 +12,12 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). * Fix a segfault when calling `Consumer::position` on a consumer that was improperly configured ([#360]). +* **Breaking change.** Change `Consumer::store_offset` to accept the topic, + partition, and offset directly ([#89], [#368]). The old API, which took a + `BorrowedMessage`, is still accessible as + `Consumer::store_offset_from_message`. + +[#89]: https://github.com/fede1024/rust-rdkafka/issues/89 [#360]: https://github.com/fede1024/rust-rdkafka/issues/360 diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 6fbc0ee52..5048ffa26 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -384,7 +384,17 @@ where } } - fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> { + fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> { + let topic = self.client.native_topic(topic)?; + let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) }; + if error.is_error() { + Err(KafkaError::StoreOffset(error.into())) + } else { + Ok(()) + } + } + + fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> { let error = unsafe { rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset()) }; diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index b2d75c98c..3fb417b6e 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -238,10 +238,14 @@ where /// commit every message with lower offset within the same partition. fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()>; - /// Stores offset for this message to be used on the next (auto)commit. When + /// Stores offset to be used on the next (auto)commit. When /// using this `enable.auto.offset.store` should be set to `false` in the /// config. - fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>; + fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()>; + + /// Like [`Consumer::store_offset`], but the offset to store is derived from + /// the provided message. + fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>; /// Store offsets to be used on the next (auto)commit. When using this /// `enable.auto.offset.store` should be set to `false` in the config. diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 888550c6e..0e7b8906e 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -398,8 +398,12 @@ where self.base.commit_message(message, mode) } - fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> { - self.base.store_offset(message) + fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> { + self.base.store_offset(topic, partition, offset) + } + + fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> { + self.base.store_offset_from_message(message) } fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {