[DRAFT] [FLINK-24493] [flink-connector-base] Introduce DemultiplexingSink #27072
+2,123
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Per the discussions within FLINK-24493, support does not presently exist for dynamically routing incoming elements to specific sinks at runtime, based on some characteristic of the incoming element. This pull request addresses that by implementing a new
DemultiplexingSink
construct that will support such behavior.The sink implementation consists of two components: the
DemultiplexingSink
itself along with a correspondingSinkRouter
interface, whos implementation will govern how a given element will be mapped to a specific sink at runtime. These active routes (i.e. those that have been previously initialized) will be stored within the internal state of the sink to prevent unnecessary sink initialization for existing sinks, essentially functioning as a cache.Visual Example
The example above demonstrates a simple DemultiplexingSink with a router configured to route each element to a destination that corresponds with the first character of the element (e.g. "apple" -> A, "banana" -> B, etc.).
Example Usage
An example demonstrating this behavior might look like the following:
This example demonstrates consuming a series of
YourElement
elements and routes them to dynamic Kafka topics based on a specific attribute within the object itself through the following chain of events:SinkRouter
instance responsible for defining the logic to route the element to its destination topic (in this case a predefinedelement.getTopicName()
implementation)DemultiplexingSink<YourElement, String>
instance that uses the previously definedSinkRouter
instance.SinkRouter.getRoute()
will be executed to determine the sink to route the element to.NOTE: The
SinkRouter
implementation is not limited to String-based keys, so the above example could easily support dynamic routing to different Kafka brokers, topics, etc. Implementations using other popular sinks such as JDBC, Elasticsearch, etc. may likely require additional fields to resolve depending on the level of dynamic behavior required (e.g. including credentials, etc.)Brief change log
DemultiplexingSink
and relatedSinkRouter
interface to support dynamic sink creation and routing at runtime.DemultiplexingSinkState
,DemultiplexingSinkStateSerializer
, andDemultiplexingSinkWriter
related to stateful sink operations, resiliency, and recoveryDemultiplexingSinkTest
to verify sink and writer creation,DemultiplexingSinkStateSerializerTest
for verification of state serialization/deserialzation behavior, andDemultiplexingSinkWriterTest
to verify successful writing to single/multiple routesDemultiplexingSinkStateManagementTest
to verify snapshotting and restoration workflow for stateful and non-stateful sinks.DemultiplexingSinkIT
using the FlinkMiniClusterExtension
to verify supported behavior worked as expected.Verifying this change
This change added a suite of related tests covering various aspects of the
DemultiplexingSink
behavior pertaining to routing, snapshotting and recovery, and state serialization/deserialization as mentioned in the change log above. An overview of these cases can be listed below as well at a high-level file summary:DemultiplexingSinkTest
DemultiplexingSinkWriterTest
DemultiplexingSinkStateSerializerTest
DemultiplexingSinkStateManagementTest
DemultiplexingSinkIT
All of these tests consistently pass without issue - as do all of the existing suite of tests found throughout the project.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: Yes, specifically@PublicEvolving()
Documentation