Skip to content

Commit

Permalink
Don't require message in Consumer::store_offset
Browse files Browse the repository at this point in the history
  • Loading branch information
benesch committed Jun 30, 2021
1 parent b4ec50e commit 556e372
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 5 deletions.
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
* Fix a segfault when calling `Consumer::position` on a consumer that was
improperly configured ([#360]).

* **Breaking change.** Change `Consumer::store_offset` to accept the topic,
partition, and offset directly ([#89], [#368]). The old API, which took a
`BorrowedMessage`, is still accessible as
`Consumer::store_offset_from_message`.

[#89]: https://github.com/fede1024/rust-rdkafka/issues/89
[#360]: https://github.com/fede1024/rust-rdkafka/issues/360

<a name="0.26.0"></a>
Expand Down
12 changes: 11 additions & 1 deletion src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,17 @@ where
}
}

fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
let topic = self.client.native_topic(topic)?;
let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
if error.is_error() {
Err(KafkaError::StoreOffset(error.into()))
} else {
Ok(())
}
}

fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
};
Expand Down
8 changes: 6 additions & 2 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,14 @@ where
/// commit every message with lower offset within the same partition.
fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()>;

/// Stores offset for this message to be used on the next (auto)commit. When
/// Stores offset to be used on the next (auto)commit. When
/// using this `enable.auto.offset.store` should be set to `false` in the
/// config.
fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;
fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()>;

/// Like [`Consumer::store_offset`], but the offset to store is derived from
/// the provided message.
fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;

/// Store offsets to be used on the next (auto)commit. When using this
/// `enable.auto.offset.store` should be set to `false` in the config.
Expand Down
8 changes: 6 additions & 2 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,12 @@ where
self.base.commit_message(message, mode)
}

fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
self.base.store_offset(message)
fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
self.base.store_offset(topic, partition, offset)
}

fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
self.base.store_offset_from_message(message)
}

fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
Expand Down

0 comments on commit 556e372

Please sign in to comment.