Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33525] Move ImpulseSource to new Source API #950

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

Poorvankbhatia
Copy link

@Poorvankbhatia Poorvankbhatia commented Feb 27, 2025

What is the purpose of the change

This pull request migrates the ImpulseSource (used in LoadSimulationPipeline) to use Flink's DataGeneratorSource, replacing the older SourceFunction.

Brief change log

  • Replaced the old ImpulseSource (which implemented SourceFunction) with a new implementation using Flink's DataGeneratorSource.
  • Updated LoadSimulationPipeline to use env.fromSource(...) instead of env.addSource(...).
  • Ensured that the new implementation preserves the same functionality as the old one.

Verifying this change

This change is already covered by existing tests, such as:

  • Existing load simulation tests that verify the source is producing expected records.
  • The Flink job runs with the updated ImpulseSource without functional changes.
  • The change was manually verified by running a Flink job with the new ImpulseSource.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? (not applicable)

@Poorvankbhatia Poorvankbhatia changed the title Move ImpulseSource to new Source API [FLINK-33525] Move ImpulseSource to new Source API Feb 27, 2025
Copy link
Contributor

@afedulov afedulov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Poorvankbhatia thanks for the PR. Did you evaluate if this can be achieved based off of the existing DataGeneratorSource/GeneratorFunction? Generally, our approach should be to try to capture usage patterns that might also be required for generating synthetic data by Flink users and make them available by providing utilities in the generator package.

@afedulov afedulov requested a review from mxm February 28, 2025 14:16
@Poorvankbhatia
Copy link
Author

Poorvankbhatia commented Feb 28, 2025

@Poorvankbhatia thanks for the PR. Did you evaluate if this can be achieved based off of the existing DataGeneratorSource/GeneratorFunction? Generally, our approach should be to try to capture usage patterns that might also be required for generating synthetic data by Flink users and make them available by providing utilities in the generator package.

Yes, absolutely! However, using DataGeneratorSource requires adding a dependency on flink-connector-datagen in autoscaling's pom.xml. I wasn't sure if that was acceptable, so I replicated the original source instead. Let me know if i can add that dependency.

@afedulov
Copy link
Contributor

afedulov commented Feb 28, 2025

However, using DataGeneratorSource requires adding a dependency on flink-connector-datagen in autoscaling's pom.xml.

I think it is fine - datagen's only major transitive dependency is on flink-core which is already present in the operator.
@gyfora what do you think?

@gyfora
Copy link
Contributor

gyfora commented Feb 28, 2025

Since this is only an example we should focus on simplicity and add the extra dep to use the built in generator functionality. We should not add new dependencies to the autoscaler, only this example module

@Poorvankbhatia
Copy link
Author

However, using DataGeneratorSource requires adding a dependency on flink-connector-datagen in autoscaling's pom.xml.

I think it is fine - datagen's only major transitive dependency is on flink-core which is already present in the operator. @gyfora what do you think?

Updated the code. Please have a look. Thanks.

* Flink's DataGeneratorSource with RateLimiterStrategy is used to control the emission rate.
*
* Rate Calculation:
* - samplingIntervalMs / 10 gives maxSleepTimeMs, which represents the interval between emissions.
Copy link
Contributor

@afedulov afedulov Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this section needs to be adjusted. Since we do not control the maxSleepTimeMs directly, but it is rather controlled internally by the Guava's token bucket algorithm, it should not be explicitly mentioned here. It is also not strictly guaranteed to be max - at the startup there can be a short burst. The value 10 seems to just be a hardcoded parameter meaning that we want at least 10 impulses per sampling interval. Basically it should rather explain: check how many sampling intervals are there within a second, make sure that 10 impulses are generated for each sampling interval (IMPULSES_PER_SAMPLING_INTERVAL).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment. Let me know if this makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants