-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion): Kafka stateful ingestion #4028
feat(ingestion): Kafka stateful ingestion #4028
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome @claudio-benfatto! Could you actually follow the model in this PR: #3807, which is more refined? I'll do a detailed review shortly after that.
df97481
to
57c8e3e
Compare
Hi @rslanka thank you for your advice. I tried to follow the code organisation of the PR you shared as much as possible. In this draft I also tried to add support for the Thanks! |
5779e7b
to
5e291b1
Compare
b59844d
to
39e2c29
Compare
test: some more advancement test: some improvements refactoring
39e2c29
to
9b5e977
Compare
source: | ||
type: kafka | ||
config: | ||
platform_instance: my_cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stateful Ingestion is not enabled by default. So, why does this recipe provide only platform_instance but does not enable stateful ingestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rslanka the idea behind this recipe, with only platform_instance enabled, is that this PR actually adds two different features for the kafka recipe:
- platform instance
- stateful ingestion
This recipe was just meant to test platform instance in isolation in the SCENARIO 2
of the test below. I personally feel a bit more comfortable refactoring the code knowing that this test would catch possible mistakes (both in the Dataset urn naming and BrowsePath and in the DataPlatformInstanceClass attached to it), and I think that this is where it adds value.
However if after this explanation you still think that it is better to move it somewhere else, refactor it, or remove it entirely I'm also fine with it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get rid of this test altogether. The concept of platform_instance
is introduced only to support stateful ingestion, where it provides a stable identifier for the upstream source to save/retrieve the check-pointed state in a deterministic fashion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this test from the integration ones and I added some unit coverage instead.
The rationale is that I think that the unit test provides more or less the same value without adding unnecessary boilerplate and hard-to-maintain json documents (as it happens for the integration tests).
The concept of platform_instance is introduced only to support stateful ingestion
From a customer standpoint I allow myself to slightly disagree on this statement :) There is actually value for us in enabling platform instances, with or without stateful ingestion.
Up to the current moment we were ingesting datasets using the topic@platform_instance
format in the urn and we are very happy to drop it, which is exactly the problem platform instances solve for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. Sounds good!
@@ -37,3 +38,19 @@ def test_kafka_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time): | |||
golden_path=test_resources_dir / "kafka_mces_golden.json", | |||
ignore_paths=[], | |||
) | |||
|
|||
# SCENARIO 2: with platform instance | |||
# Run the metadata ingestion pipeline with: platform instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we really need this test.
# the topic within the test code | ||
print("Failed to delete topic {}: {}".format(topic, e)) | ||
|
||
def __enter__(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elegant state management via the context manger, nicely done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* test: test stateful ingestion for kafka test: some more advancement test: some improvements refactoring * refactor: remove some linter modifications * tests: add unit tests for kafka state * refactor: minor changes * tests: improve test coverage * fix: fix naming * style: fix format with black * fix: fix broken test * revert: revert smoke tests to master * feat: add reporting to kafka source * tests: add smoke tests for kafka reporting * revert: revert changes to the smoke tests * test: add kafka integration test for stateful ingestion * docs: update documentation on kafka source * fix: return empty string when no platform instance * revert: remove unwanted file * fix: solve problem with platform instance * chore: use console sink instead of file * fix: disable complexity check for _extract_record * fix: remove if condition in get_platform_instance_id * chore: remove unneeded integration test * test: test platform instance in kafka source unit tests
* test: test stateful ingestion for kafka test: some more advancement test: some improvements refactoring * refactor: remove some linter modifications * tests: add unit tests for kafka state * refactor: minor changes * tests: improve test coverage * fix: fix naming * style: fix format with black * fix: fix broken test * revert: revert smoke tests to master * feat: add reporting to kafka source * tests: add smoke tests for kafka reporting * revert: revert changes to the smoke tests * test: add kafka integration test for stateful ingestion * docs: update documentation on kafka source * fix: return empty string when no platform instance * revert: remove unwanted file * fix: solve problem with platform instance * chore: use console sink instead of file * fix: disable complexity check for _extract_record * fix: remove if condition in get_platform_instance_id * chore: remove unneeded integration test * test: test platform instance in kafka source unit tests
Checklist