-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14563: RemoveClient-Side AddPartitionsToTxn Requests #16840
base: trunk
Are you sure you want to change the base?
Conversation
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
Show resolved
Hide resolved
Also cc: @artemlivshits to take a look :) |
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
if (nodeApiVersions.finalizedFeatures() != null) { | ||
/* | ||
To enable the transaction V2, it requires: | ||
1. transaction.version finalized version >= 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check every node API versions (not just coordinator's) and keep track of the latest transaction version (where "latest" is defined as the latest epoch). So that logic would be something like this:
- When we get API versions from any node, check if the transaction manager has the epoch that is less than the new epoch.
- If the epoch stored in the transaction manager is less than the new epoch, store the new value and the new epoch in the transaction manager.
- Have a method
boolean coordinatorSupportsTransactionV2() { return this.transactionVersion >= 2; }
This way we learn the cluster-wide consensus on the transaction version, vs. individual brokers that may have different value so our view doesn't jump back and forth (if coordinator moves, for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. I agree 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we get API versions from any node
The optimal way of handling this is to have an ApiVersionsRequest callback and check the transaction version in it. However, it is a bit tricky here. The ApiVersionsRequest is handled in the KafkaClient, a layer below the Sender class. It does not expose any interaction method other than the apiVersions object. If we want to register a callback, we should have an interface change in the KafkaClient which may need a KIP?
A suboptimal way is to check the apiVersions for all the node features when calling the coordinatorSupportsTransactionV2. I guess it is acceptable as the checking does not cost much. If so, I guess we don't need to store the epoch and transactionVersion anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something, but can we store this within ApiVersions and update the value every time a new apiVersions request is handled? We already call apiVersions.update(node, nodeVersionInfo);
and that does a calculation for max produce magic, so could we do another method in update to check the epoch and if it is greater than the existing epoch, we update the internal value in ApiVersions? (We may need to add epoch to NodeApiVersions -- and this is not a public api)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did something a bit different here. Every time the ApiVersions are updated, the maxFinalizedFeaturesEpoch among the NodeApiVersions is stored. Then the transaction manager can just check this maxFinalizedFeaturesEpoch to determine if it needs to check the feature updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it gets the same thing accomplished. 👍
d465e8f
to
61a661e
Compare
Hmm have we checked the tests @CalvinConfluent? There seem to be some issues. (Not just the threads, but some related to transactions) |
@@ -971,7 +987,7 @@ synchronized boolean isInitializing() { | |||
return isTransactional() && currentState == State.INITIALIZING; | |||
} | |||
|
|||
void handleCoordinatorReady() { | |||
synchronized void handleCoordinatorReady() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason to make this synchronized
?
|
||
public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) { | ||
if (maxFinalizedFeaturesEpoch > nodeApiVersions.finalizedFeaturesEpoch()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect that we still want to put the specific node's api versions into the per-node map. Then we can check the epoch and put cluster-wide finalized features if it has newer epoch. Something like this:
this.nodeApiVersions.put(nodeId, nodeApiVersions);
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
if (maxFinalizedFeaturesEpoch > nodeApiVersions.finalizedFeaturesEpoch()) {
this.maxFinalizedFeaturesEpoch = nodeApiVersions.finalizedFeaturesEpoch();
this.finalizedFeatures = nodeApiVersions.finalizedFeatures();
}
if (finalizedFeaturesEpoch == -1) { | ||
this.finalizedFeatures = Collections.emptyMap(); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need this special case or the code below would just work? I think it's good to avoid extra code.
@@ -1570,6 +1576,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { | |||
metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); | |||
} | |||
this.interceptors.onAcknowledgement(metadata, exception); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra empty line?
@@ -397,6 +399,7 @@ private long sendProducerData(long now) { | |||
// Update both readyTimeMs and drainTimeMs, this would "reset" the node | |||
// latency. | |||
this.accumulator.updateNodeLatencyStats(node.id(), now, true); | |||
if (transactionManager != null) this.transactionManager.handleCoordinatorReady(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this line or it's a left-over from a preview version of this PR?
@@ -216,6 +221,10 @@ public String transactionalId() { | |||
return transactionalId; | |||
} | |||
|
|||
public boolean isTransactionV2Requested() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used?
@@ -216,6 +221,10 @@ public String transactionalId() { | |||
return transactionalId; | |||
} | |||
|
|||
public boolean isTransactionV2Requested() { | |||
return version() > LAST_BEFORE_TRANSACTION_V2_VERSION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call the static function?
@@ -36,7 +36,12 @@ | |||
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951) | |||
// | |||
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890). | |||
"validVersions": "0-11", | |||
// | |||
// Version 12 is the same as version 11 (KIP-890). Note when produce requests are used in transaction, if |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the extended comment be part of the request instead of response? So that it's consistent with TxnOffsetCommitRequest.
} | ||
|
||
/** | ||
* This is an enum which handles the Partition Response based on the Request Version and the exact operation | ||
* defaultError: This is the default workflow which maps to cases when the Produce Request Version or the Txn_offset_commit request was lower than the first version supporting the new Error Class | ||
* genericError: This maps to the case when the clients are updated to handle the TransactionAbortableException | ||
* addPartition: This is a WIP. To be updated as a part of KIP-890 Part 2 | ||
* addPartition: This allows the partition to be added to the transactions inflight with the Produce and TxnOffsetCommit requests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right code model to represent client capabilities? The way it's coded it looks like there are 3 mutually exclusive options, but the options are not mutually exclusive: clients that support txnv2 can also support rollback error.
@@ -848,6 +848,8 @@ class ReplicaManager(val config: KafkaConfig, | |||
Errors.COORDINATOR_NOT_AVAILABLE | | |||
Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException( | |||
s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) | |||
case Errors.UNKNOWN_PRODUCER_ID => Some(new OutOfOrderSequenceException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment why we translate this error this way? Also, it looks like we don't return this error any more (at least not in the product code).
https://issues.apache.org/jira/browse/KAFKA-14563