Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14563: RemoveClient-Side AddPartitionsToTxn Requests #16840

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from

Conversation

CalvinConfluent
Copy link
Contributor

@CalvinConfluent CalvinConfluent changed the title [kafka-14563] RemoveClient-Side AddPartitionsToTxn Requests KAFKA-14563: RemoveClient-Side AddPartitionsToTxn Requests Aug 8, 2024
gradle/spotbugs-exclude.xml Outdated Show resolved Hide resolved
@jolshan
Copy link
Contributor

jolshan commented Aug 19, 2024

Also cc: @artemlivshits to take a look :)

if (nodeApiVersions.finalizedFeatures() != null) {
/*
To enable the transaction V2, it requires:
1. transaction.version finalized version >= 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check every node API versions (not just coordinator's) and keep track of the latest transaction version (where "latest" is defined as the latest epoch). So that logic would be something like this:

  1. When we get API versions from any node, check if the transaction manager has the epoch that is less than the new epoch.
  2. If the epoch stored in the transaction manager is less than the new epoch, store the new value and the new epoch in the transaction manager.
  3. Have a method boolean coordinatorSupportsTransactionV2() { return this.transactionVersion >= 2; }

This way we learn the cluster-wide consensus on the transaction version, vs. individual brokers that may have different value so our view doesn't jump back and forth (if coordinator moves, for example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. I agree 👍

Copy link
Contributor Author

@CalvinConfluent CalvinConfluent Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we get API versions from any node

The optimal way of handling this is to have an ApiVersionsRequest callback and check the transaction version in it. However, it is a bit tricky here. The ApiVersionsRequest is handled in the KafkaClient, a layer below the Sender class. It does not expose any interaction method other than the apiVersions object. If we want to register a callback, we should have an interface change in the KafkaClient which may need a KIP?

A suboptimal way is to check the apiVersions for all the node features when calling the coordinatorSupportsTransactionV2. I guess it is acceptable as the checking does not cost much. If so, I guess we don't need to store the epoch and transactionVersion anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but can we store this within ApiVersions and update the value every time a new apiVersions request is handled? We already call apiVersions.update(node, nodeVersionInfo); and that does a calculation for max produce magic, so could we do another method in update to check the epoch and if it is greater than the existing epoch, we update the internal value in ApiVersions? (We may need to add epoch to NodeApiVersions -- and this is not a public api)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did something a bit different here. Every time the ApiVersions are updated, the maxFinalizedFeaturesEpoch among the NodeApiVersions is stored. Then the transaction manager can just check this maxFinalizedFeaturesEpoch to determine if it needs to check the feature updates.

Copy link
Contributor

@jolshan jolshan Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it gets the same thing accomplished. 👍

@jolshan
Copy link
Contributor

jolshan commented Sep 6, 2024

Hmm have we checked the tests @CalvinConfluent? There seem to be some issues. (Not just the threads, but some related to transactions)

@@ -971,7 +987,7 @@ synchronized boolean isInitializing() {
return isTransactional() && currentState == State.INITIALIZING;
}

void handleCoordinatorReady() {
synchronized void handleCoordinatorReady() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason to make this synchronized?


public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) {
if (maxFinalizedFeaturesEpoch > nodeApiVersions.finalizedFeaturesEpoch()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that we still want to put the specific node's api versions into the per-node map. Then we can check the epoch and put cluster-wide finalized features if it has newer epoch. Something like this:

        this.nodeApiVersions.put(nodeId, nodeApiVersions);
        this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
        if (maxFinalizedFeaturesEpoch > nodeApiVersions.finalizedFeaturesEpoch()) {
          this.maxFinalizedFeaturesEpoch = nodeApiVersions.finalizedFeaturesEpoch();
          this.finalizedFeatures = nodeApiVersions.finalizedFeatures();
        }

Comment on lines +139 to +142
if (finalizedFeaturesEpoch == -1) {
this.finalizedFeatures = Collections.emptyMap();
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need this special case or the code below would just work? I think it's good to avoid extra code.

@@ -1570,6 +1576,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
}
this.interceptors.onAcknowledgement(metadata, exception);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra empty line?

@@ -397,6 +399,7 @@ private long sendProducerData(long now) {
// Update both readyTimeMs and drainTimeMs, this would "reset" the node
// latency.
this.accumulator.updateNodeLatencyStats(node.id(), now, true);
if (transactionManager != null) this.transactionManager.handleCoordinatorReady();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this line or it's a left-over from a preview version of this PR?

@@ -216,6 +221,10 @@ public String transactionalId() {
return transactionalId;
}

public boolean isTransactionV2Requested() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used?

@@ -216,6 +221,10 @@ public String transactionalId() {
return transactionalId;
}

public boolean isTransactionV2Requested() {
return version() > LAST_BEFORE_TRANSACTION_V2_VERSION;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call the static function?

@@ -36,7 +36,12 @@
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951)
//
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-11",
//
// Version 12 is the same as version 11 (KIP-890). Note when produce requests are used in transaction, if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the extended comment be part of the request instead of response? So that it's consistent with TxnOffsetCommitRequest.

}

/**
* This is an enum which handles the Partition Response based on the Request Version and the exact operation
* defaultError: This is the default workflow which maps to cases when the Produce Request Version or the Txn_offset_commit request was lower than the first version supporting the new Error Class
* genericError: This maps to the case when the clients are updated to handle the TransactionAbortableException
* addPartition: This is a WIP. To be updated as a part of KIP-890 Part 2
* addPartition: This allows the partition to be added to the transactions inflight with the Produce and TxnOffsetCommit requests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right code model to represent client capabilities? The way it's coded it looks like there are 3 mutually exclusive options, but the options are not mutually exclusive: clients that support txnv2 can also support rollback error.

@@ -848,6 +848,8 @@ class ReplicaManager(val config: KafkaConfig,
Errors.COORDINATOR_NOT_AVAILABLE |
Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"))
case Errors.UNKNOWN_PRODUCER_ID => Some(new OutOfOrderSequenceException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment why we translate this error this way? Also, it looks like we don't return this error any more (at least not in the product code).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants