Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Kafka stateful ingestion #4028

Merged

Conversation

claudio-benfatto
Copy link
Contributor

@claudio-benfatto claudio-benfatto commented Feb 1, 2022

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable)

@github-actions
Copy link

github-actions bot commented Feb 1, 2022

Unit Test Results (build & test)

  70 files  ±0    70 suites  ±0   13m 8s ⏱️ +13s
609 tests ±0  550 ✔️ ±0  59 💤 ±0  0 ±0 

Results for commit ad880a1. ± Comparison against base commit 06bb033.

♻️ This comment has been updated with latest results.

Copy link
Contributor

@rslanka rslanka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome @claudio-benfatto! Could you actually follow the model in this PR: #3807, which is more refined? I'll do a detailed review shortly after that.

@github-actions
Copy link

github-actions bot commented Feb 1, 2022

Unit Test Results (metadata ingestion)

    3 files      3 suites   42m 36s ⏱️
317 tests 317 ✔️   0 💤 0
908 runs  879 ✔️ 29 💤 0

Results for commit ad880a1.

♻️ This comment has been updated with latest results.

@claudio-benfatto claudio-benfatto force-pushed the kafka_stateful_ingestion branch from df97481 to 57c8e3e Compare February 1, 2022 21:22
@claudio-benfatto
Copy link
Contributor Author

Awesome @claudio-benfatto! Could you actually follow the model in this PR: #3807, which is more refined? I'll do a detailed review shortly after that.

Hi @rslanka thank you for your advice.

I tried to follow the code organisation of the PR you shared as much as possible.
However, given that some of the changes affect the base classes I could not follow through completely, without rebasing to the PR branch I mean.
For a complete refactoring, maybe, it would make sense to wait until it is merged.

In this draft I also tried to add support for the platform_instance, because I don't think stateful ingestion would make sense without it. However it may be more convenient to open a different PR for it.

Thanks!

@claudio-benfatto claudio-benfatto force-pushed the kafka_stateful_ingestion branch from 5779e7b to 5e291b1 Compare February 3, 2022 15:47
@claudio-benfatto claudio-benfatto marked this pull request as ready for review February 6, 2022 21:04
@claudio-benfatto claudio-benfatto force-pushed the kafka_stateful_ingestion branch 2 times, most recently from b59844d to 39e2c29 Compare February 9, 2022 08:36
source:
type: kafka
config:
platform_instance: my_cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stateful Ingestion is not enabled by default. So, why does this recipe provide only platform_instance but does not enable stateful ingestion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rslanka the idea behind this recipe, with only platform_instance enabled, is that this PR actually adds two different features for the kafka recipe:

  • platform instance
  • stateful ingestion

This recipe was just meant to test platform instance in isolation in the SCENARIO 2 of the test below. I personally feel a bit more comfortable refactoring the code knowing that this test would catch possible mistakes (both in the Dataset urn naming and BrowsePath and in the DataPlatformInstanceClass attached to it), and I think that this is where it adds value.
However if after this explanation you still think that it is better to move it somewhere else, refactor it, or remove it entirely I'm also fine with it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of this test altogether. The concept of platform_instance is introduced only to support stateful ingestion, where it provides a stable identifier for the upstream source to save/retrieve the check-pointed state in a deterministic fashion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this test from the integration ones and I added some unit coverage instead.
The rationale is that I think that the unit test provides more or less the same value without adding unnecessary boilerplate and hard-to-maintain json documents (as it happens for the integration tests).

The concept of platform_instance is introduced only to support stateful ingestion

From a customer standpoint I allow myself to slightly disagree on this statement :) There is actually value for us in enabling platform instances, with or without stateful ingestion.
Up to the current moment we were ingesting datasets using the topic@platform_instance format in the urn and we are very happy to drop it, which is exactly the problem platform instances solve for us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Sounds good!

@@ -37,3 +38,19 @@ def test_kafka_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
golden_path=test_resources_dir / "kafka_mces_golden.json",
ignore_paths=[],
)

# SCENARIO 2: with platform instance
# Run the metadata ingestion pipeline with: platform instance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we really need this test.

# the topic within the test code
print("Failed to delete topic {}: {}".format(topic, e))

def __enter__(self):
Copy link
Contributor

@rslanka rslanka Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elegant state management via the context manger, nicely done!

Copy link
Contributor

@rslanka rslanka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@shirshanka shirshanka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@shirshanka shirshanka merged commit aeefde4 into datahub-project:master Feb 15, 2022
hevandro-veiga pushed a commit to hevandro-veiga/datahub that referenced this pull request Feb 18, 2022
* test: test stateful ingestion for kafka

test: some more advancement

test: some improvements

refactoring

* refactor: remove some linter modifications

* tests: add unit tests for kafka state

* refactor: minor changes

* tests: improve test coverage

* fix: fix naming

* style: fix format with black

* fix: fix broken test

* revert: revert smoke tests to master

* feat: add reporting to kafka source

* tests: add smoke tests for kafka reporting

* revert: revert changes to the smoke tests

* test: add kafka integration test for stateful ingestion

* docs: update documentation on kafka source

* fix: return empty string when no platform instance

* revert: remove unwanted file

* fix: solve problem with platform instance

* chore: use console sink instead of file

* fix: disable complexity check for _extract_record

* fix: remove if condition in get_platform_instance_id

* chore: remove unneeded integration test

* test: test platform instance in kafka source unit tests
maggiehays pushed a commit to maggiehays/datahub that referenced this pull request Aug 1, 2022
* test: test stateful ingestion for kafka

test: some more advancement

test: some improvements

refactoring

* refactor: remove some linter modifications

* tests: add unit tests for kafka state

* refactor: minor changes

* tests: improve test coverage

* fix: fix naming

* style: fix format with black

* fix: fix broken test

* revert: revert smoke tests to master

* feat: add reporting to kafka source

* tests: add smoke tests for kafka reporting

* revert: revert changes to the smoke tests

* test: add kafka integration test for stateful ingestion

* docs: update documentation on kafka source

* fix: return empty string when no platform instance

* revert: remove unwanted file

* fix: solve problem with platform instance

* chore: use console sink instead of file

* fix: disable complexity check for _extract_record

* fix: remove if condition in get_platform_instance_id

* chore: remove unneeded integration test

* test: test platform instance in kafka source unit tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants