Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17367: Share coordinator impl. Broker side code. [2/N] #17011

Merged
merged 35 commits into from
Sep 10, 2024

Conversation

smjn
Copy link
Contributor

@smjn smjn commented Aug 27, 2024

  • Added impl for ShareCoordinatorService and ShareCoordinatorShard
  • Moved group-coordinator: GroupCoordinatorRuntimeMetrics -> coordinator-common: CoordinatorRuntimeMetricsImpl. The new impl class can be inherited and used in both group and share coordinators.
  • Added tests wherever applicable
  • Added plumbing in BrokerServer and BrokerMetadataPublisher to create share coordinator and start/stop it.
  • Added ShareCoordinatorConfig class to house various coordinator related configs.
  • Added code to create share state topic in AutoTopicCreationManager.scala
  • Added
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1

to config/kraft/{broker.properties, controller.properties, server.properties} to make it easier for people to try out.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @smjn! This is a pretty big PR, so I've just reviewed part of it for now.

Regarding the leader epoch, it seems like we are checking the MetadataImage for some things (like if a topic exists), but using the given leader epoch in the RPC to see if the leader has changed. I wonder if we should be checking the given leader epoch against the MetadataImage.

Where are we handling topic deletions and re-creations? Maybe this is coming in a future PR?

@@ -195,6 +202,7 @@ public KafkaApis build() {
replicaManager,
groupCoordinator,
txnCoordinator,
shareCoordinator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a null check above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

share coordinator can be null - there is a specific flag on basis of which share coord is initialized in BrokerServer, otw it is null. I can do a Optional perhaps

@@ -534,7 +534,8 @@ class KafkaServer(
Some(adminManager),
Some(kafkaController),
groupCoordinator,
transactionCoordinator
transactionCoordinator,
null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Option instead of null here.

int partition = topicEntry.getKey();
try {
long timeTaken = time.hiResClockMs() - startTime;
WriteShareGroupStateResponseData partitionData = future.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block indefinitely if the future is not complete. I see we have combined these futures up on L309, but maybe we can replace this get with either getNow(null) or get(0L, TimeUnit.Millisecond) just to ensure we're never blocking here.

Comment on lines 4450 to 4451
shareCoordinator.readState(request.context, readShareGroupStateRequest.data)
.thenAccept(data => requestHelper.sendMaybeThrottle(request, new ReadShareGroupStateResponse(data)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probable want CompletableFuture#handle here instead. That will allow you to handle exceptions from the future chain explicitly. See #12403 for some context

Comment on lines 516 to 518
private static <P> boolean isEmpty(List<P> list) {
return list == null || list.isEmpty() || list.get(0) == null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case would we expect a list with a null first element?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

log.debug("ShareCoordinatorService writeState request dump - {}", request);

String groupId = request.groupId();
Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponseData>>> futureMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this down closer to where it's used

WriteShareGroupStateResponseData partitionData = future.get();
// This is the future returned by runtime.scheduleWriteOperation which returns when the
// operation has completed including
shareCoordinatorMetrics.globalSensors.get(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the pattern we want for metrics. In GC, I see we are calling record(String sensorName) rather than accessing the globalSensors

Comment on lines 313 to 316
List<WriteShareGroupStateResponseData.WriteStateResult> writeStateResults = futureMap.keySet().stream()
.map(topicId -> {
List<WriteShareGroupStateResponseData.PartitionResult> partitionResults = futureMap.get(topicId).entrySet().stream()
.map(topicEntry -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested streams make the control flow here hard to follow. See if you can untangle it a bit. Maybe regular for loops would be more readable.

.setStateEpoch(newStateEpoch)
.setStateBatches(batchesToAdd)
.build()));
snapshotUpdateCount.put(key, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not update a timeline structure here. Recall that this method generate the records and eventual response for the proposed write. We cannot update our in-memory state until the write is committed.

It looks like snapshotUpdateCount is used to determine when we should write a snapshot instead of a delta (L299). We can reset this counter to zero in handleShareSnapshot as we're replaying records

Comment on lines +428 to +429
// Updating the leader map with the new leader epoch
leaderEpochMap.put(coordinatorKey, leaderEpoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we updating the in-memory state during a read? Is this part of the leader epoch fencing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is.

Caller might issue a read without ever making a write request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this logic be replaced by the init RPC once it is added?

In general, we should not be updating our log based in-memory state on reads. It's probably safe in this case just based on the nature of leader epoch, but it is definitely atypical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this will stay.
As per the spec initialize RPC does not include a leader epoch value which we can reference
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-InitializeShareGroupStateAPI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Can you add a comment at leaderEpochMap declaration noting that it can be updated on a read request?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, David has a valid point here. It's not clear why we need to update leaderEpoch on reads. My understanding of the fencing logic is that we just need to prevent an old reader from reading newly updated state. So, updating leaderEpoch on writes is enough. Also note that since this update is not persisted, it will be lost on leader change.

@smjn
Copy link
Contributor Author

smjn commented Aug 27, 2024

Thanks for the patch @smjn! This is a pretty big PR, so I've just reviewed part of it for now.

Regarding the leader epoch, it seems like we are checking the MetadataImage for some things (like if a topic exists), but using the given leader epoch in the RPC to see if the leader has changed. I wonder if we should be checking the given leader epoch against the MetadataImage.

Where are we handling topic deletions and re-creations? Maybe this is coming in a future PR?

For EA only read and write RPCs will be provided.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates 👍 some additional comments inline

}

log.info("Shutting down.");
isActive.set(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is redundant due to L231

(partitionId, responseFut) -> {
try {
partitionResults.add(
responseFut.get(5000L, TimeUnit.MILLISECONDS).results().get(0).partitions().get(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a comment here explaining that the future will already be completed at this point?

Comment on lines 145 to 146
if (config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot() < 0 || config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot() > 500)
throw new IllegalArgumentException("SnapshotUpdateRecordsPerSnapshot must be between 0 and 500.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this kind of validation happen in the ShareCoordinatorConfig class?

Comment on lines +428 to +429
// Updating the leader map with the new leader epoch
leaderEpochMap.put(coordinatorKey, leaderEpoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this logic be replaced by the init RPC once it is added?

In general, we should not be updating our log based in-memory state on reads. It's probably safe in this case just based on the nature of leader epoch, but it is definitely atypical.

Comment on lines 483 to 488
null, partitionId, Errors.INVALID_TOPIC_EXCEPTION, Errors.INVALID_TOPIC_EXCEPTION.message()));
}

if (partitionId < 0) {
return Optional.of(ReadShareGroupStateResponse.toErrorResponseData(
topicId, partitionId, Errors.INVALID_PARTITIONS, Errors.INVALID_PARTITIONS.message()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those error messages are pretty generic. We should be more specific here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go with invalid_request and custom message then

Comment on lines +207 to +210
handleShareSnapshot((ShareSnapshotKey) key.message(), (ShareSnapshotValue) messageOrNull(value));
break;
case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: // ShareUpdate
handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can there be a null record value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future when we delete these things, I expect we'll write tombstones as markers even though it's not a compacted topic. This enables us to bookkeep the records on the topic and work out what we can prune.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when Delete RPC is introduced (tombstone)

public ReadShareGroupStateResponseData readState(ReadShareGroupStateRequestData request, Long offset) {
throw new RuntimeException("Not implemented");
log.debug("Read request dump - {}", request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a way to log requests with log4j (log4j.logger.kafka.request.logger) so we don't need to log requests here. If we want to log some other details here (like "reading share state for partitions: [...]") that would be fine

Comment on lines 496 to 497
if (metadataImage != null && (metadataImage.topics().getTopic(topicId) == null ||
metadataImage.topics().getPartition(topicId, partitionId) == null)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, this usage of MetadataImage is okay despite the fact that it may differ from the metadata known to the share partition leader.

For the newly created topic scenario, the UNKNOWN_TOPIC_OR_PARTITION is returned which is retriable. For the case of a recently deleted topic, there's no harm in letting the write go through.

@smjn
Copy link
Contributor Author

smjn commented Aug 29, 2024

Will this logic be replaced by the init RPC once it is added?
In general, we should not be updating our log based in-memory state on reads. It's probably safe in this case just based >on the nature of leader epoch, but it is definitely atypical.

No it will stay.
The initialize rpc does not include any information about the leader epoch which we can store and reference.
We are going by epoch definition.

Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A first round of review comments.

shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
readShareGroupStateRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not configured.")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that's an internal server error. Only Kafka code issues this RPC. It only does it when the share coordinator is enabled.

shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
writeShareRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not configured.")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one.

@@ -150,6 +159,34 @@ public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) {
Collections.singletonMap(CONSUMER_GROUP_COUNT_STATE_TAG, ConsumerGroupState.DEAD.toString())
);

shareGroupCountMetricName = metrics.metricName(
SHARE_GROUP_COUNT_METRIC_NAME,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metric names do not match the KIP. I am certainly happy to tweak the KIP, but either way the KIP and the code need to match. Let me know what you want to do here @smjn .

String groupId = request.groupId();
// Send an empty response if groupId is invalid
if (isGroupIdEmpty(groupId)) {
log.error("Group id must be specified and non-empty: {}", request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably INVALID_REQUEST if it's empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was previously decided to respond with empty response. We even added a test for the same.

long timeTaken = time.hiResClockMs() - startTime;
// This is the future returned by runtime.scheduleWriteOperation which returns when the
// operation has completed including
WriteShareGroupStateResponseData partitionData = responseFut.get(5000L, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this only run when the inidividual futures are complete? I'm surprised you think you need to wait up to 5 seconds here.

Copy link
Contributor Author

@smjn smjn Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added arbitrary number. Fixed in next revision

WriteShareGroupStateResponseData partitionData = responseFut.get(5000L, TimeUnit.MILLISECONDS);
shareCoordinatorMetrics.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, timeTaken);
partitionResults.addAll(partitionData.results().get(0).partitions());
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you're not using GroupCoordinatorService.handleOperationException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can

Comment on lines +207 to +210
handleShareSnapshot((ShareSnapshotKey) key.message(), (ShareSnapshotValue) messageOrNull(value));
break;
case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: // ShareUpdate
handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future when we delete these things, I expect we'll write tombstones as markers even though it's not a compacted topic. This enables us to bookkeep the records on the topic and work out what we can prune.

Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments.

@@ -35,7 +35,7 @@ import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, isInternal}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A trivial point, but I would make the topic names in the import statement alphabetical.

@@ -38,7 +38,8 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.server.config.{ServerConfigs, ShareCoordinatorConfig}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether ShareCoordinatorConfig should be in package o.a.k.coordinator.share.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are some other interfaces and classes which need to be moved.
I will raise a separate PR for that to not pollute this one.

).asJava

val config = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is the case that the new group coordinator is enabled by default in trunk now. I expect this test no longer needs to enable it.

public static final int STATE_TOPIC_NUM_PARTITIONS_DEFAULT = 50;
public static final String STATE_TOPIC_NUM_PARTITIONS_DOC = "The number of partitions for the share-group state topic (should not change after deployment).";

public static final String STATE_TOPIC_REPLICATION_FACTOR_CONFIG = "share.coordinator.state.topic.replication.factor";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the KIP, the values of share.coordinator.state.topic.replication.factor=1 and share.coordinator.state.topic.min.isr=1 should be included in the Internal Topic Settings section of the properties files in the directory config/kraft. I suggest you do that in this PR since the configs are being introduced now. This will make it a bit easier for people using a single-broker configuration taken directly from GitHub to try this out.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @smjn! Few more comments.

topicEntry.forEach(
// map of partition id -> responses from api
(partitionId, responseFut) -> {
long timeTaken = time.hiResClockMs() - startTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an extremely pedantic nitpick, but we should get the current time once outside of the loop. otherwise, the time delta calculation will be somewhat skewed by the execution time of the loop

Comment on lines 439 to 440
futureMap.computeIfAbsent(topicId, k -> new HashMap<>());
futureMap.get(topicId).put(partitionData.partition(), future);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Map "compute" methods return the value, so you don't need to get it again


// Transform the combined CompletableFuture<Void> into CompletableFuture<ReadShareGroupStateResponseData>
return combinedFuture.thenApply(v -> {
List<ReadShareGroupStateResponseData.ReadStateResult> readStateResult = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a linked list here (and other similar places)? Seems like we know the expected size of the resulting list, so we could allocate an ArrayList with the size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smjn do you plan on fixing this here or in a follow up PR? Either is fine by me.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smjn : Thanks for the PR. Left a few comments.

@@ -1689,8 +1694,8 @@ class KafkaApis(val requestChannel: RequestChannel,
(txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)

case CoordinatorType.SHARE =>
// When share coordinator support is implemented in KIP-932, a proper check will go here
return (Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
// None check already done above
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what check is the comment referring to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sharecoordinator is an Optional. The line below the comment is directly calling Sharecoordinator.get, because the case shareCoordinator.isEmpty check is already done in line 1703

*/
@SuppressWarnings("NPathComplexity")
public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> writeState(
RequestContext context,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context seems unused?

if (stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > partitionData.stateEpoch()) {
return Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId));
}
if (metadataImage != null && (metadataImage.topics().getTopic(topicId) == null ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If metadataImage is null, we should return UNKNOWN_TOPIC_OR_PARTITION too, right?

} else {
// start offset is being updated - we should only
// consider new updates to batches
batchesToAdd = partitionData.stateBatches().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why are we only consider the new updates when start offset changes? Consider the following example.

State in ShareCoordinator:
startOffset: 100
Batch1 {
firstOffset: 100
lastOffset: 109
deliverState: Acquired
deliverCount: 1
}
Batch2 {
firstOffset: 110
lastOffset: 119
deliverState: Acquired
deliverCount: 2
}
Batch3 {
firstOffset: 120
lastOffset: 129
deliverState: Acquired
deliverCount: 0
}

  1. Share leader acks batch 1 and sends the state of batch 1 to Share Coordinator.
  2. Share leader advances startOffset to 110.
  3. Share leader acks batch 3 and sends the new startOffset and the state of batch 3 to share coordinator.
  4. Share coordinator writes the snapshot with startOffset 110 and batch 3.

Now the deliver count for batch 2 is lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed and added test for the scenario testNonSequentialBatchUpdates

private final Logger log;
private final Time time;
private final CoordinatorTimer<Void, CoordinatorRecord> timer;
private final ShareCoordinatorConfig config;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time, timer and config seem unused?

Copy link
Contributor Author

@smjn smjn Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time and timer are required to be present by the CoordinatorShardBuilder runtime. Will remove the fields and keep methods

// should be complete as we used CompletableFuture::allOf to get a combined future from
// all futures in the map.
WriteShareGroupStateResponseData partitionData = responseFut.getNow(null);
shareCoordinatorMetrics.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, timeTaken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeTaken is the same for all topic partitions in the request. It doesn't seem right to record the same value per partition?

@smjn
Copy link
Contributor Author

smjn commented Sep 3, 2024

@junrao Thanks for the review, incorporated comments.

@apoorvmittal10 apoorvmittal10 added the KIP-932 Queues for Kafka label Sep 6, 2024
Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few more comments, @smjn. I think we're close!

Comment on lines +299 to +302
request.topics().forEach(topicData -> {
Map<Integer, CompletableFuture<WriteShareGroupStateResponseData>> partitionFut =
futureMap.computeIfAbsent(topicData.topicId(), k -> new HashMap<>());
topicData.partitions().forEach(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I find the logic here a little hard to follow with the nesting and futures. I'd like us to consider refactoring this a bit in a future PR.


// Transform the combined CompletableFuture<Void> into CompletableFuture<ReadShareGroupStateResponseData>
return combinedFuture.thenApply(v -> {
List<ReadShareGroupStateResponseData.ReadStateResult> readStateResult = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smjn do you plan on fixing this here or in a follow up PR? Either is fine by me.

// be looping over the keys below and constructing new WriteShareGroupStateRequestData objects to pass
// onto the shard method.
Map<Uuid, Map<Integer, CompletableFuture<WriteShareGroupStateResponseData>>> futureMap = new HashMap<>();
long startTime = time.hiResClockMs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd name this something like "now" or "nowMs" to make it clear what it is. Sometimes "startTime" can refer to something in the far past (like the start of an event).


// time taken for write
shareCoordinatorMetrics.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME,
time.hiResClockMs() - startTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you have defined startTime above. I think we should use that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being used to calculate the delta
we want to record how much time it took for the writeState call.

Comment on lines +428 to +429
// Updating the leader map with the new leader epoch
leaderEpochMap.put(coordinatorKey, leaderEpoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Can you add a comment at leaderEpochMap declaration noting that it can be updated on a read request?

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I left another comment about some additional testing, but that can come later. I'd like to land this PR so we can unblock some other work and start making incremental improvements.

Comment on lines +558 to +562
private static List<PersisterOffsetsStateBatch> combineStateBatches(
Collection<PersisterOffsetsStateBatch> currentBatch,
Collection<PersisterOffsetsStateBatch> newBatch,
long startOffset
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can tighten up this signature. We know what types are expected, so we can specify them here. That will make the code below a bit more understandable I think.

I'd also like to see some unit tests for this method. It seems like a good candidate.

Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

@mumrah mumrah merged commit 821c101 into apache:trunk Sep 10, 2024
5 of 7 checks passed
mingyen066 pushed a commit to mingyen066/kafka that referenced this pull request Sep 10, 2024
Introduces the share coordinator. This coordinator is built on the new coordinator runtime framework. It 
is responsible for persistence of share-group state in a new internal topic named "__share_group_state".
The responsibility for being a share coordinator is distributed across the brokers in a cluster. 

Reviewers: David Arthur <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smjn : Thanks for the updated PR. Left a few more followup comments.

@@ -313,6 +328,15 @@ class BrokerMetadataPublisher(
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting TransactionCoordinator", t)
}
if (config.shareGroupConfig.isShareGroupEnabled && shareCoordinator.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In BrokerServer, we create shareCoordinator only if config.shareGroupConfig.isShareGroupEnabled is true. So, it seems there is no need to check config.shareGroupConfig.isShareGroupEnabled here?

* @return CoordinatorResult(records, response)
*/
@SuppressWarnings("NPathComplexity")
public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> writeState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't directly write the state. It generates the state to be written by the caller. So, perhaps rename to generateRecordsAndResult?

Comment on lines +428 to +429
// Updating the leader map with the new leader epoch
leaderEpochMap.put(coordinatorKey, leaderEpoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, David has a valid point here. It's not clear why we need to update leaderEpoch on reads. My understanding of the fencing logic is that we just need to prevent an old reader from reading newly updated state. So, updating leaderEpoch on writes is enough. Also note that since this update is not persisted, it will be lost on leader change.

private final TimelineHashMap<SharePartitionKey, ShareGroupOffset> shareStateMap; // coord key -> ShareGroupOffset
// leaderEpochMap can be updated by writeState call
// or if a newer leader makes a readState call.
private final TimelineHashMap<SharePartitionKey, Integer> leaderEpochMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the synchronization on leaderEpochMap since it's read and written by different threads? Ditto for other TimelineHashMap below.

private final TimelineHashMap<SharePartitionKey, Integer> leaderEpochMap;
private final TimelineHashMap<SharePartitionKey, Integer> snapshotUpdateCount;
private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
private MetadataImage metadataImage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no synchronization on this. Should this be volatile?

shareStateMap.put(mapKey, offsetRecord);
// if number of share updates is exceeded, then reset it
if (snapshotUpdateCount.containsKey(mapKey)) {
if (snapshotUpdateCount.get(mapKey) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure I understand this. Every time we have a new snapshot, we should always reset the count to 0 independent the current count, right?

List<CoordinatorRecord> validRecords = new LinkedList<>();

WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData();
for (CoordinatorRecord record : recordList) { // should be single record
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If recordList always contains a single record, why does it need to be a list?


if (shareStateMap.containsKey(mapKey) && shouldIncSnapshotEpoch) {
ShareGroupOffset oldValue = shareStateMap.get(mapKey);
((ShareSnapshotValue) record.value().message()).setSnapshotEpoch(oldValue.snapshotEpoch() + 1); // increment the snapshot epoch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to roll the value to 0 when it exceeds 65535?

) {
currentBatch.removeAll(newBatch);
List<PersisterOffsetsStateBatch> batchesToAdd = new LinkedList<>(currentBatch);
batchesToAdd.addAll(newBatch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to verify if this is an issue. Suppose currentBatch is
batch1 {
firstOffset: 100
lastOffset: 109
deliverState: Available
deliverCount: 1
}
and newBatch is
batch2{
firstOffset: 105
lastOffset: 105
deliverState: Acknowledge
deliverCount: 1
}

After the call to combineStateBatches(), Share coordinator will have both batches in its state and thus the share leader could have both batches too (after initializing from ReadShareGroupState). Now suppose that the share leader fetches the following batch and calls SharePartition.acquire().

fetchedBatch{
firstOffset: 100
lastOffset: 109
}

Both batch1 and batch2 will match the fetched batch. When calling acquireSubsetBatchRecords() on batch1, we will add the full batch to AcquiredRecords. When calling acquireSubsetBatchRecords() on batch2, we will skip since the only record in it has been acked. But AcquiredRecords is unchanged after this. This means that we will return the full batch as acquired records, which is incorrect since offset 105 shouldn't be acquired.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants