Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gather attributes of downstream resources #61

Merged
merged 1 commit into from
Feb 9, 2024
Merged

Conversation

ryannedolan
Copy link
Collaborator

Summary

The subscription controller now gathers "attributes" from downstream resources and reports them as .status.attributes. These attributes are open-ended and depend on the various downstream controllers, but currently include jobId, numPartitions, startTime, and more, as reported by the Kafka topic controller and Flink operator.

The Kafka topic controller now reports the actual number of partitions, which may diverge from the kafka.numPartitions hint. This means users can request a specific number of partitions with .spec.hints.kafka.numPartitions and then check .status.attributes.numPartitions to see the actual number of partitions.

Details

Attributes are collected from downstream resources and the job, not from upstream/input resources. For example, numPartitions would refer to an output Kafka topic, not an input Kafka topic.

Since it's possible for a sink/output table to have multiple physical tables under-the-hood (e.g. an adapter may create multiple Kafka topics for a single table), it's possible that attributes from different controllers will clash/conflict. We make a best effort by bubbling up the last seen attribute with a given key. This jibes with the strategy for applying "hints": they are applied across the entire set of output resources. In particular, setting the hint kafka.numPartitions = N will likely mean that .status.attributes.numPartitions = N, unless an adapter is doing something goofy.

Testing

Validated that the Flink operator and KafkaTopic controller bubble-up attributes to Subscriptions:

$ kubectl get subs -ojsonpath={.items..status.attributes} | jq
{
  "jobId": "b3c1dd9f48ac15620e5d35ef17dfc041",
  "jobName": "products-flink-job",
  "numPartitions": "1",
  "startTime": "1707459424703",
  "state": "RUNNING",
  "updateTime": "1707459561622"
}

A neat consequence of this feature is that you can now check the state of the underlying Flink job.

Copy link
Collaborator

@atoomula atoomula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@ryannedolan ryannedolan merged commit 927057e into main Feb 9, 2024
1 check passed
@ryannedolan ryannedolan deleted the status-details branch February 9, 2024 20:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants