Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36001][state] Store operator name and UID in state metadata #25267

Merged
merged 1 commit into from
Sep 13, 2024

Conversation

gaborgsomogyi
Copy link
Contributor

What is the purpose of the change

Normally users can define an operator in a Flink application like this where UID and name can be assigned to it:

stream.map(new ValueProcessUDF())
       .uid(“value-process-uid”)
       .name(“value-process-name”)

At the moment checkpoint metadata file is not containing operator UID and name which makes hard for users to find out what is the human readable intention for a specific operator. Please see FLIP-474 further details.

In this PR I've added the 2 mentioned fields into metadata file.

Brief change log

Added operator UID and name to metadata file.

Verifying this change

  • Existing + modified + new automated tests
  • Manually on cluster (read, write, operator UID change)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 29, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gaborgsomogyi
Copy link
Contributor Author

In order to help reviewers the general concept is the following. UID and name fields are optional and as such they're null in the initial part of the code chain. I've kept the null values until MetadataV5Serializer where empty string is stored. The reverse way is to convert empty string as null to keep consistency. The other possibility was to come up with a new optional string storage mechanism which I've tried but in my view it would just add extra complexity with relatively high risk and no gain.

}

@Test
public void testChangeUidHashOnly(@TempDir Path tmp) throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some extra coverage where only the hash changed.

String savepointPath, Tuple2<Collection<Integer>, String>... assertions)
throws Exception {
String savepointPath, ValidationParameters... validationParameters) throws Exception {
// validate metadata
Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention here with the extra assertions is to make sure that UID and hash are consistent with each other.

@@ -213,7 +213,7 @@ private List<Integer> readInputSplit(
throws IOException {
KeyedStateInputFormat<Integer, VoidNamespace, Integer> format =
new KeyedStateInputFormat<>(
new OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4),
new OperatorState(null, null, OperatorIDGenerator.fromUid("uid"), 1, 4),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such cases I've considered to add a constructor where UID and name fields are just hardcoded null values (then we wouldn't need so many code changes in tests) but then I've ended up to miss production code parts where the mentioned fields are essential which caused partially working solution. I think having a rock state state story is must have from user's perspective so I've implemented this a more defensive way (compiler blows up not having enough constructor parameters) in order to change all the required execution paths.

@gaborgsomogyi
Copy link
Contributor Author

cc @gyfora

@gyfora
Copy link
Contributor

gyfora commented Sep 3, 2024

Thanks @gaborgsomogyi , overall looks good. Please add 1-2 comments to the critical parts that you also explain in the PR.

@gaborgsomogyi
Copy link
Contributor Author

Just rebased and added some explanation to the code.

@gaborgsomogyi
Copy link
Contributor Author

@Zakelly any comments/thought?

@Zakelly
Copy link
Contributor

Zakelly commented Sep 9, 2024

@Zakelly any comments/thought?

Thanks for the heads up, looking into this...

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, I left some comments, PTAL.

@gaborgsomogyi gaborgsomogyi force-pushed the FLINK-36001 branch 2 times, most recently from 7772b98 to b3ee07b Compare September 12, 2024 08:25
Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! Overall LGTM.

@gaborgsomogyi gaborgsomogyi merged commit 02110ce into apache:master Sep 13, 2024
@fapaul
Copy link

fapaul commented Sep 13, 2024

@gaborgsomogyi this PR broke the master build by missing the license header for OperatorIDPairTest can you fix that?

@gaborgsomogyi
Copy link
Contributor Author

Oh gosh, fixing it now.

@gaborgsomogyi
Copy link
Contributor Author

Just filed: #25329

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants