-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mitigate errors in rebalances with partitions in SubSourceLogic #466
Conversation
Hi @dvallejo, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement: |
CLA signed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for trying to fix this!
Looks quite reasonable.
@@ -84,6 +84,9 @@ akka.kafka.consumer { | |||
# Disable auto-commit by default | |||
enable.auto.commit = false | |||
} | |||
|
|||
# Time to wait for pending requests when a partition is closed | |||
wait-close-partition = 5s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 seconds as default sounds like a very long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agree. Changed to 500 ms
partitionsToRevoke = partitionsToRevoke -- tps | ||
|
||
getOffsetsOnAssign.fold(pumpCB.invoke(topicsToBeAssigned)) { getOffsets => | ||
getOffsets(topicsToBeAssigned) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to restrict to the "new" ones.
Wouldn't topicPartitions
or partitions
be a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agree. Changed to partitions
materializer.scheduleOnce( | ||
settings.waitClosePartition, | ||
new Runnable { | ||
override def run(): Unit = cb.invoke(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
() => cb.invoke()
should be enough.
@@ -332,7 +332,7 @@ class IntegrationSpec extends SpecBase(kafkaPort = KafkaPorts.IntegrationSpec) { | |||
} | |||
} | |||
|
|||
"call the onRevoked hook" in { | |||
"call the onRevoked hook" ignore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was ignored because the new policy about re-use partitions. I've re-activated the test again. Now, we have to wait for the default time to ensure that the old partition is closed and a new one will be created.
Sorry, I misguided you - Scala 2.11 requires the Runnable. |
No problem. I´ve reverted to Runnable |
9708578
to
372821b
Compare
372821b
to
350367e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Congratulations! You're the first to experience the MigrationManager checks to fail.
We want to become serious on binary compatibility between versions and this tool helps us. The error indicated on Travis shows that introducing the parameter in the middle of things pushed a default parameter in the copy method around.
@@ -291,6 +292,7 @@ class ConsumerSettings[K, V]( | |||
val closeTimeout: FiniteDuration, | |||
val commitTimeout: FiniteDuration, | |||
val wakeupTimeout: FiniteDuration, | |||
val waitClosePartition: FiniteDuration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move the new parameter last.
Oh, I was going crazy. I didn´t understand the fail 😄 Thanks!! I´ve changed the field to the last position. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more suggestions.
It might be worthwhile to sprinkle a few log messages into the logic, WDYT?
} | ||
} | ||
} | ||
} | ||
|
||
var partitionsToRevoke: Set[TopicPartition] = Set.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these vars up to the other mutable fields, please.
.onComplete { | ||
case Failure(ex) => stageFailCB.invoke(new ConsumerFailed(s"Failed to fetch offset for partitions: $tps.", ex)) | ||
case Failure(ex) => stageFailCB.invoke(new ConsumerFailed(s"Failed to fetch offset for partitions: $partitions.", ex)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ${partitions.mkString(", ")}
.recover { | ||
case _: AskTimeoutException => stageFailCB.invoke(new ConsumerFailed(s"Consumer failed during seek for partitions: $tps.")) | ||
case _: AskTimeoutException => stageFailCB.invoke(new ConsumerFailed(s"Consumer failed during seek for partitions: $partitions.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
} | ||
} | ||
} | ||
} | ||
|
||
var partitionsToRevoke: Set[TopicPartition] = Set.empty | ||
var revokePendingCall: Option[Cancellable] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more like pendingRevokeCall
.
materializer.scheduleOnce( | ||
settings.waitClosePartition, | ||
new Runnable { | ||
override def run(): Unit = cb.invoke(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could change revokePendingCall
to None
after invoking.
Totally agree. I've added some log messages and your requests. If you think there is not enough log messages, feel free to request more. Thanks for the review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some details...
val partitions = tps -- partitionsToRevoke | ||
|
||
if (partitions.nonEmpty) { | ||
log.debug(s"Assigning new partitions: ${partitions.mkString(", ")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For logging you should use the log.debug("message {}", value)
notation as it defers string creation.
In the particular case you might just add log.isDebugEnabled
to the if clause.
subSources --= partitionsToRevoke | ||
} | ||
|
||
log.debug(s"Waiting ${settings.waitClosePartition.toMillis} ms for pending requests before close partitions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might just use settings.waitClosePartition
which will print its unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I'll fix the logging.
Thank you for your contribution. Keep them coming! |
This fix mitigates the problem about close partitions with pending requests. It is related with this issue #382
To mitigate this problem I´ve wait a configured time before close the partition. This time can be configured with the property "wait-close-partition".
Also, I´ve added the possibility to re-use partitions. If we are going rebalance from 1 partition to 2, it´s not necessary close the original partition, just open a new one is necessary.
I had to ignore one IT because now a revoke is not required if we can re-use a partition.
I don´t have much experience with akka-streams or reactive kafka so any feedback is welcome.