Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mitigate errors in rebalances with partitions in SubSourceLogic #466

Merged
merged 11 commits into from
May 29, 2018

Conversation

dvallejo
Copy link
Contributor

This fix mitigates the problem about close partitions with pending requests. It is related with this issue #382

To mitigate this problem I´ve wait a configured time before close the partition. This time can be configured with the property "wait-close-partition".
Also, I´ve added the possibility to re-use partitions. If we are going rebalance from 1 partition to 2, it´s not necessary close the original partition, just open a new one is necessary.

I had to ignore one IT because now a revoke is not required if we can re-use a partition.

I don´t have much experience with akka-streams or reactive kafka so any feedback is welcome.

@lightbend-cla-validator

Hi @dvallejo,

Thank you for your contribution! We really value the time you've taken to put this together.

Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement:

http://www.lightbend.com/contribute/cla

@dvallejo
Copy link
Contributor Author

CLA signed

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for trying to fix this!
Looks quite reasonable.

@@ -84,6 +84,9 @@ akka.kafka.consumer {
# Disable auto-commit by default
enable.auto.commit = false
}

# Time to wait for pending requests when a partition is closed
wait-close-partition = 5s
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 seconds as default sounds like a very long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree. Changed to 500 ms

partitionsToRevoke = partitionsToRevoke -- tps

getOffsetsOnAssign.fold(pumpCB.invoke(topicsToBeAssigned)) { getOffsets =>
getOffsets(topicsToBeAssigned)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to restrict to the "new" ones.
Wouldn't topicPartitions or partitions be a better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree. Changed to partitions

materializer.scheduleOnce(
settings.waitClosePartition,
new Runnable {
override def run(): Unit = cb.invoke(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

() => cb.invoke() should be enough.

@@ -332,7 +332,7 @@ class IntegrationSpec extends SpecBase(kafkaPort = KafkaPorts.IntegrationSpec) {
}
}

"call the onRevoked hook" in {
"call the onRevoked hook" ignore {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this ignored?

Copy link
Contributor Author

@dvallejo dvallejo May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was ignored because the new policy about re-use partitions. I've re-activated the test again. Now, we have to wait for the default time to ensure that the old partition is closed and a new one will be created.

@ennru
Copy link
Member

ennru commented May 23, 2018

Sorry, I misguided you - Scala 2.11 requires the Runnable.

@dvallejo
Copy link
Contributor Author

No problem. I´ve reverted to Runnable

@dvallejo dvallejo force-pushed the fix-rebalances-with-partitions branch from 9708578 to 372821b Compare May 23, 2018 15:13
@dvallejo dvallejo force-pushed the fix-rebalances-with-partitions branch from 372821b to 350367e Compare May 23, 2018 15:37
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulations! You're the first to experience the MigrationManager checks to fail.
We want to become serious on binary compatibility between versions and this tool helps us. The error indicated on Travis shows that introducing the parameter in the middle of things pushed a default parameter in the copy method around.

@@ -291,6 +292,7 @@ class ConsumerSettings[K, V](
val closeTimeout: FiniteDuration,
val commitTimeout: FiniteDuration,
val wakeupTimeout: FiniteDuration,
val waitClosePartition: FiniteDuration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the new parameter last.

@dvallejo
Copy link
Contributor Author

Oh, I was going crazy. I didn´t understand the fail 😄 Thanks!! I´ve changed the field to the last position.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more suggestions.
It might be worthwhile to sprinkle a few log messages into the logic, WDYT?

}
}
}
}

var partitionsToRevoke: Set[TopicPartition] = Set.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these vars up to the other mutable fields, please.

.onComplete {
case Failure(ex) => stageFailCB.invoke(new ConsumerFailed(s"Failed to fetch offset for partitions: $tps.", ex))
case Failure(ex) => stageFailCB.invoke(new ConsumerFailed(s"Failed to fetch offset for partitions: $partitions.", ex))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ${partitions.mkString(", ")}

.recover {
case _: AskTimeoutException => stageFailCB.invoke(new ConsumerFailed(s"Consumer failed during seek for partitions: $tps."))
case _: AskTimeoutException => stageFailCB.invoke(new ConsumerFailed(s"Consumer failed during seek for partitions: $partitions."))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

}
}
}
}

var partitionsToRevoke: Set[TopicPartition] = Set.empty
var revokePendingCall: Option[Cancellable] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is more like pendingRevokeCall.

materializer.scheduleOnce(
settings.waitClosePartition,
new Runnable {
override def run(): Unit = cb.invoke(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could change revokePendingCall to None after invoking.

@dvallejo
Copy link
Contributor Author

Totally agree. I've added some log messages and your requests. If you think there is not enough log messages, feel free to request more. Thanks for the review!

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some details...

val partitions = tps -- partitionsToRevoke

if (partitions.nonEmpty) {
log.debug(s"Assigning new partitions: ${partitions.mkString(", ")}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For logging you should use the log.debug("message {}", value) notation as it defers string creation.
In the particular case you might just add log.isDebugEnabled to the if clause.

subSources --= partitionsToRevoke
}

log.debug(s"Waiting ${settings.waitClosePartition.toMillis} ms for pending requests before close partitions")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might just use settings.waitClosePartition which will print its unit.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll fix the logging.

@ennru ennru merged commit b63df63 into akka:master May 29, 2018
@ennru ennru added this to the 0.21 milestone May 29, 2018
@ennru
Copy link
Member

ennru commented May 29, 2018

Thank you for your contribution. Keep them coming!

@edrevo
Copy link
Contributor

edrevo commented Jun 6, 2018

yay! many thanks @dvallejo for this PR! @ennru, do you know when 0.21 will be released? 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants