Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

committablePartitionedManualOffsetSource does not use getOffsetsOnAssign on rebalance #1086

Closed
francisdb opened this issue Mar 27, 2020 · 31 comments
Assignees

Comments

@francisdb
Copy link

Versions used

private val akkaVersion          = "2.6.4"
private val alpakkaKafkaVersion  = "2.0.2"

Expected Behavior

We do manual offset management where we start from the last snapshot offset and replay without side-effects to the committed offset, then we enable side-effects.
We expect the stream to have a monotonic progress where offsets increment one by one. In case the offset goes back we ignore that. If the offset takes a step of > 1 we throw an error as this indicates we are skipping messages.

phases:

  1. load snapshot with it's offset x(getOffsetsOnAssign)
  2. load last committed offset y (getOffsetsOnAssign)
  3. alpakka-kafka seeks to offset x and starts the stream
  4. we process messages without side-effects until we reach y
  5. we process messages with side-effects enabled

Actual Behavior

On a rebalance during the replay phase (4) and the partition is revoked/reassigned we see that alpakka-kafka keeps the stream open and seeks to the last committed offset (y) which skips a lot of messages and messes up the replay.

This is a common case on redeploy where we have some successive rebalances as nodes are replaced.

Relevant logs

[Consumer clientId=xxx-3aaf87e5-707e-4a78-8d1e-b67280ae98b1@xxx-869655c6-97pq2, groupId=mygroup] Setting offset for partition mypartition-2 to the committed offset FetchPosition{offset=10675950, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=zzz.europe-west1.gcp.confluent.cloud:9092 (id: 11 rack: 2), epoch=130}}

How to solve this?

Does the above behavior make sense for manual offset handling?
Is this configurable?

Without optimizations we could just go for the simple solution of not keeping the stream open on quick revoke/assign.

Any other suggestions?

@seglo
Copy link
Contributor

seglo commented Mar 27, 2020

Thanks for raising this issue @francisdb. Are you saying that getOffsetsOnAssign is not being called for the new assigned partitions during your replay phase?

@francisdb
Copy link
Author

No for existing assignments that get revoked and directly assigned again there is this seek to last commit.

@seglo
Copy link
Contributor

seglo commented Mar 27, 2020

For partitions that were already assigned to the same consumer group member?

@francisdb
Copy link
Author

Yes

Logs indicate for example:
revoke 1 2 3
Assign 1 2 3 4 5

4 & 5 start properly with new stream after call to getOffsetsOnAssign(1 2 3 4 5)
1 2 3 seek to last commit instead of result from getOffsetsOnAssign

@francisdb francisdb changed the title committablePartitionedManualOffsetSource does not call getOffsetsOnAssign on rebalance committablePartitionedManualOffsetSource does not use getOffsetsOnAssign on rebalance Mar 28, 2020
@seglo
Copy link
Contributor

seglo commented Mar 30, 2020

Got it. Is this something you would like to take on? If not, I can add it to my backlog.

@seglo seglo self-assigned this Mar 30, 2020
@francisdb
Copy link
Author

To be honest I have a hard time understanding all those callbacks and internal state handling.

@francisdb
Copy link
Author

francisdb commented Mar 31, 2020

Investigated a bit more and I see that on an assign the kafka driver automatically seeks to the last committed offset for all assigned partitions:

if assign 1 2 3 4 5 we get this logged for all partitions, even before getOffsetsOnAssign([1,2,3,4,5]) is called

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator | Setting offset for partition mypartition-[partitionId] to the committed offset

@seglo
Copy link
Contributor

seglo commented Mar 31, 2020

Yes, we do some optimizations here as well for the committable partitioned sources when partitions are reassigned back to the same group member, so that they don't replay too many messages that may be "in flight" within the stream. For manualOffsetSources we should call the getOffsetsOnAssign function though.

@seglo
Copy link
Contributor

seglo commented Apr 15, 2020

@francisdb I did some digging into this and I'm thinking about how to proceed. I've discovered that the getOffsetsOnAssign event handler is actually getting called on rebalance, but it is superseded in certain situations when a committed offset already exists for the partition. This is only a problem when using committablePartitionedManualOffsetSource. plainPartitionedManualOffsetSource should be unaffected.

I have a few questions for you:

  1. One condition where this happens is during the process of "commit refreshing". This involves querying the broker on some interval to keep group offsets from expiring on the cluster. Alpakka Kafka provides a feature to work around an offset expiry bug that was fixed in Apache Kafka 2.1.0. Out of curiosity, are you using this feature by overriding the default Alpakka Kafka configuration for akka.kafka.consumer.commit-refresh-interval to some interval?
  2. When do you commit offsets to Kafka with respect to committing to your external offset store?
  3. Is it possible for you to use plainPartitionedManualOffsetSource instead?

@francisdb
Copy link
Author

We have a hosted kafka solution in the same cloud datacenter (Confluent Cloud)

  1. no use of akka.kafka.consumer.commit-refresh-interval in our code
akka.kafka.consumer {
  // Don't wait when stopping the stream (because we're using external offset storage)
  // See https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#settings
  stop-timeout = 0s

  // Relaxed commit timeout to align with use on Confluent Cloud
  commit-timeout = 25s
  commit-time-warning = 10s

  # Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
  # configured by `consumer.metadata-request-timeout`
  connection-checker {

    #Flag to turn on connection checker
    enable = true
  }

}

and

ConsumerSettings(actorSystem,...,...)
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

When do you commit offsets to Kafka with respect to committing to your external offset store?

We store snapshots (external offset store) every 1000 messages to a object store
We use a Producer.committableSink with CommitterSettings(actorSystem).withMaxInterval(2.seconds)

Is it possible for you to use plainPartitionedManualOffsetSource instead?

I inherited this project and am not sure why we chose committablePartitionedManualOffsetSource over this one. How are messages then committed, it's not directly clear from the scaladocs?

    lazy val committerSettings: CommitterSettings = CommitterSettings(actorSystem).withMaxInterval(2.seconds)


    val operationResultMessageSink = Producer
      .committableSink(runtimeOperationFeedbackProducerSettings, committerSettings)
      .withAttributes(ActorAttributes.supervisionStrategy {
        case e: RetriableCommitFailedException =>
          logger.warn(s"Ignoring RetriableCommitFailedException for RuntimeOperationTopic: ${e.getMessage}")
          Supervision.Resume
        case e: CommitFailedException if e.getMessage.contains("already rebalanced") =>
          // Commit cannot be completed since the group has already rebalanced and assigned the partitions to another
          // member. This means that the time between subsequent calls to poll() was longer than the
          // configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time
          // message processing. You can address this either by increasing max.poll.interval.ms or by reducing
          // the maximum size of batches returned in poll() with max.poll.records.
          // TODO should normally be fixed but maybe not for the setup we use
          //   https://github.com/akka/alpakka-kafka/issues/755
          logger.warn(s"Ignoring CommitFailedException when rebalanced for RuntimeOperationTopic: ${e.getMessage}")
          Supervision.Resume
        case _ => Supervision.Stop
      })

@seglo
Copy link
Contributor

seglo commented Apr 15, 2020

Thanks for the information @francisdb.

I inherited this project and am not sure why we chose committablePartitionedManualOffsetSource over this one. How are messages then committed, it's not directly clear from the scaladocs?

The committablePartitionedManualOffsetSource was added for users who want to store the offset externally (and start from their stored offsets) as well as commit offsets to Kafka so that they can use it as a backup to their stored offsets and/or with Kafka ecosystem tools that report metrics on consumer groups, such as lag. If neither of these reasons apply to you then you could use Consumer.plainPartitionedManualOffsetSource with a Producer.plainSink to workaround the issue.

I acknowledge that committablePartitionedManualOffsetSource still has a problem, but it's not used by a great deal of Alpakka Kafka users. It was only introduced last fall at the request of a Lightbend client (not sure if it's the same organization you work for?) for the use case I described above.

@francisdb
Copy link
Author

We need both offsets as storing snapshots is rather expensive and we need to know until how far we have processed messages past the snapshot. (to avoid duplicated side-effects) This is a typical eventsourcing solution for when state is kept for a long time (months) and a lot of messages have to be handled. (we are not that Lightbend client)

@seglo
Copy link
Contributor

seglo commented Apr 16, 2020

Got it. Is there any reason why you chose not to store/commit offsets in external storage too, apart from your occasional snapshot?

@seglo
Copy link
Contributor

seglo commented Apr 16, 2020

How are messages then committed, it's not directly clear from the scaladocs?

I missed this earlier question. With the plain sources offsets aren't committed back to Kafka. The user stores them somewhere else and either uses the ManualSubscription to provide a partition to offset map at startup, or the ManualOffsetSource sources. The latter has the benefit of using consumer groups.

https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-storage-external-to-kafka

@francisdb
Copy link
Author

Is there any reason why you chose not to store/commit offsets in external storage too, apart from your occasional snapshot?

Those snapshots are not occasional. We handle a lot of messages and 1000 messages on a partition are not a lot so snasphots are being written frequently.

Regarding the storing I guess there is no reason not to store them in Kafka? That would require an extra stateful datastore which we try to avoid. (snapshots are stored on s3/google cloud storage)

To be honest we have not yet reached a stable point where we can start tweaking partitions/snapshot/commit intervals. We even decided to put our kafka efforts aside for a while because of this issue. As long as we have a single consumer everything works perfectly but once we get rebalances we end up in a mess.

@seglo
Copy link
Contributor

seglo commented Apr 20, 2020

@francisdb Is it possible for you to produce a snapshot and try out the fix in #1100?

@francisdb
Copy link
Author

Will try to have our test system running with this

@seglo
Copy link
Contributor

seglo commented Apr 27, 2020

@francisdb We're planning an Alpakka Kafka release this week, so it would be a good opportunity to get this solved soon. Have you had a chance to see if this works for you?

@francisdb
Copy link
Author

Not yet, we have a fork witch some changes regarding #1066 that I need to update first. But it's next on my todo list so should not take long.

@seglo
Copy link
Contributor

seglo commented Apr 27, 2020

Ok, thanks!

@francisdb
Copy link
Author

getOffsetsOnAssign seems to be called twice now

consumer report assignments
getOffsetsOnAssign(0,1,2,3)
getOffsetsOnAssign() succeeded in PT16.266S
consumer reports: Seeking to offset 15001 for partition ....
(above for each assigned partition)
getOffsetsOnAssign(0,1,2,3)
CommittableSubSourceStageLogic reports starting substream
getOffsetsOnAssign() succeeded in PT15.101S
consumer reports: Seeking to offset 15001 for partition ....
(above for each assigned partition)

and we still have newer messages arriving after re-assign (could be related to #1038)
(log view from partition 2)

getOffsetsOnAssign(2, ...)
partition 2 stream completed
getOffsetsOnAssign() succeeded in PT19.856S
consumer reports: Seeking to offset 15001 for partition 2
getOffsetsOnAssign(2, ...)
partition 2 stream starts
getOffsetsOnAssign() succeeded in PT19.856S
consumer reports: Seeking to offset 15001 for partition 2
stream logic reports: Invalid future offset 15501 on partition 2

@seglo
Copy link
Contributor

seglo commented Apr 29, 2020

@francisdb Thanks for the update. I'll dig into this some more later, but it may not be before the next release. One question regarding your log view from partition 2:

getOffsetsOnAssign(2, ...)
partition 2 stream completed
getOffsetsOnAssign() succeeded in PT19.856S
consumer reports: Seeking to offset 15001 for partition 2
getOffsetsOnAssign(2, ...)
partition 2 stream starts
getOffsetsOnAssign() succeeded in PT19.856S
consumer reports: Seeking to offset 15001 for partition 2

Is this ^ the logging from Kafka's ConsumerCoordinator or Alpakka Kafka? I assume that when the problem occurs you always observe Kafka's logging of reseeking to the last committed offset after the Alpakka Kafka logging.

stream logic reports: Invalid future offset 15501 on partition 2

@francisdb
Copy link
Author

francisdb commented Apr 29, 2020

these are the details of that log line:

  level: "INFO"   
  logger: "org.apache.kafka.clients.consumer.KafkaConsumer"   
  message: "[Consumer clientId=myclientid, groupId=foo] Seeking to offset 15001 for partition foo-2"   
  thread: "kafka-runtime-akka.kafka.default-dispatcher-14"   
  timestamp: "2020-04-28 19:27:57.540"   

I was thinking to come up with a small project that reproduces this issue as I understand it's not easy to debug like this, however that might take quite some of my time

@seglo
Copy link
Contributor

seglo commented Apr 29, 2020

Thanks. Does this log line always occur after the similar Alpakka Kafka logging Synchronously seeking to offsets during rebalance ...?

I was thinking to come up with a small project that reproduces this issue as I understand it's not easy to debug like this, however that might take quite some of my time

That probably isn't necessary. I thought I constructed a test that reproduced this reliably, but maybe I misread the ordering of seeks between Kafka Consumer internals and the Alpakka Kafka logic.

I thought this was caused by a timing issue of Alpakka Kafka seeking to external offsets after a rebalance was complete. I'm not sure if there's any way to know when the Consumer is doing this internally since it happens after the rebalance. I'll have to dig more to see if it's truly a background thread or triggered in the first poll after a rebalance. If it's the latter maybe we check after an empty poll if the seek position was reset, and then seek to external offsets before polling again.

@francisdb
Copy link
Author

Does this log line always occur after the similar Alpakka Kafka logging Synchronously seeking to offsets during rebalance ...?

we don't have debug logging enabled on our staging system so can't tell for now

@francisdb
Copy link
Author

Do you need me to do another run with debug logging enabled?

@seglo
Copy link
Contributor

seglo commented May 5, 2020

@francisdb Sure, that would be helpful for me to assert that I recreated your use case.

@francisdb
Copy link
Author

Closing as nobody else seems to have this issue and we are no longer using alpakka-kafka for this use case

@ennru ennru added this to the invalid/not release-bound milestone Oct 22, 2020
@feliperazeek
Copy link

@francisdb question how did you do we process messages without side-effects until we reach y we process messages with side-effects enabled?

@francisdb
Copy link
Author

francisdb commented Aug 16, 2021

@francisdb question how did you do we process messages without side-effects until we reach y we process messages with side-effects enabled?

We are now using the native blocking client which allows loading snapshots and store our snapshots with offset on a cloud object store.

This is what currently works for us
Everything runs in the blocking consumer thread

We have a snapshot loading ConsumerRebalanceListener that:

  • loads last snapshots for the assigned partitions
  • seeks to the offset of the last snapshot for each partition (x)
  • looks up the last committed offsets. (y)
  • restores the model from the snapshot including the initial y in a map per partition that is shared with the consumer
  • pauses the assigned partitions (see overflow queue case below)

The consumer:

  • if the effect overflow queue is empty resumes consumption
  • if the effect overflow queue is not empty tries to send out previous effects
  • if the effect overflow queue is empty handles some messages, keeps track of effects to handle, drops everything below the initialy, then sends out those effects on a kafka streams queue, if one fails to be queued we keep the rest as overflow and pause consumption (poor man's back pressure)
  • commit the offset of the last sent out effect y'
  • periodically sends out snapshots to store (state + x')

We were blocked with alpakka kafka on:

  • the passing of the loaded snapshot and y from the listener to the partitioned stream start
  • guaranteeing correct seeks (stop the world / flush buffers during rebalance)

@feliperazeek
Copy link

thank you @francisdb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants