Skip to content

Conversation

rionmonster
Copy link
Contributor

@rionmonster rionmonster commented Oct 1, 2025

What is the purpose of the change

Per the discussions within FLINK-24493, support does not presently exist for dynamically routing incoming elements to specific sinks at runtime, based on some characteristic of the incoming element. This pull request addresses that by implementing a new DemultiplexingSink construct that will support such behavior.

The sink implementation consists of two components: the DemultiplexingSink itself along with a corresponding SinkRouter interface, whos implementation will govern how a given element will be mapped to a specific sink at runtime. These active routes (i.e. those that have been previously initialized) will be stored within the internal state of the sink to prevent unnecessary sink initialization for existing sinks, essentially functioning as a cache.

Visual Example

Screenshot 2025-09-30 at 11 29 02 PM

The example above demonstrates a simple DemultiplexingSink with a router configured to route each element to a destination that corresponds with the first character of the element (e.g. "apple" -> A, "banana" -> B, etc.).

Example Usage

An example demonstrating this behavior might look like the following:

// Define SinkRouter (for routing element to specific sink)
SinkRouter<YourElement, String> router = new SinkRouter<YourElement, String>() {
    @Override
    public String getRoute(YourElement element) {
        // Sink-key resolution (in this case get the name of the topic)
        return element.getTopicName();
    }

    @Override
    public Sink<YourElement> createSink(String topicName, YourElement element) {
        return KafkaSink.<YourElement>builder()
            .setBootstrapServers(...)
            .setRecordSerializer(...)
            .setTopics(topicName)
            .build();
    }
};

// Define the sink
DemultiplexingSink<YourElement, String> demuxSink =
    new DemultiplexingSink<>(router);

// Example applying the sink
streamEnv
    .process(YourBusinessLogic())
    .sinkTo(demuxSink);

This example demonstrates consuming a series of YourElement elements and routes them to dynamic Kafka topics based on a specific attribute within the object itself through the following chain of events:

  • Defining the SinkRouter instance responsible for defining the logic to route the element to its destination topic (in this case a predefined element.getTopicName() implementation)
  • Defining a DemultiplexingSink<YourElement, String> instance that uses the previously defined SinkRouter instance.
  • When the element is sent to the sink, the logic within SinkRouter.getRoute() will be executed to determine the sink to route the element to.
    • If the route key does not exist, the sink will be initialized and stored within the internal cache.
    • If the key exists, the sink will be read from the cache directly.
  • The element will be sent to the resolved sink.

NOTE: The SinkRouter implementation is not limited to String-based keys, so the above example could easily support dynamic routing to different Kafka brokers, topics, etc. Implementations using other popular sinks such as JDBC, Elasticsearch, etc. may likely require additional fields to resolve depending on the level of dynamic behavior required (e.g. including credentials, etc.)

Brief change log

  • Added DemultiplexingSink and related SinkRouter interface to support dynamic sink creation and routing at runtime.
  • Added supporting classes like DemultiplexingSinkState, DemultiplexingSinkStateSerializer, and DemultiplexingSinkWriter related to stateful sink operations, resiliency, and recovery
  • Added DemultiplexingSinkTest to verify sink and writer creation, DemultiplexingSinkStateSerializerTest for verification of state serialization/deserialzation behavior, and DemultiplexingSinkWriterTest to verify successful writing to single/multiple routes
    • Added DemultiplexingSinkStateManagementTest to verify snapshotting and restoration workflow for stateful and non-stateful sinks.
  • Added simple DemultiplexingSinkIT using the Flink MiniClusterExtension to verify supported behavior worked as expected.

Verifying this change

This change added a suite of related tests covering various aspects of the DemultiplexingSink behavior pertaining to routing, snapshotting and recovery, and state serialization/deserialization as mentioned in the change log above. An overview of these cases can be listed below as well at a high-level file summary:

File Summary
DemultiplexingSinkTest Covers sink, writer, and state serializer creation
DemultiplexingSinkWriterTest Covers routing, flushing, snapshotting, and watermarking
DemultiplexingSinkStateSerializerTest Covers state deserialization
DemultiplexingSinkStateManagementTest Covers snapshotting and state restoration
DemultiplexingSinkIT Covers Flink-backed routing via DemultiplexingSink

All of these tests consistently pass without issue - as do all of the existing suite of tests found throughout the project.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): No
  • The public API, i.e., is any changed class annotated with @Public(Evolving): Yes, specifically @PublicEvolving()
  • The serializers: No
  • The runtime per-record code paths (performance sensitive): No
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
  • The S3 file system connector: No

Documentation

  • Does this pull request introduce a new feature? Yes
  • If yes, how is the feature documented? Javadocs

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 1, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@rionmonster
Copy link
Contributor Author

rionmonster commented Oct 1, 2025

@dmvk / @AHeise / @fapaul

Hey folks - it's been a while! I'm resurrecting an old discussion that we had within a JIRA ticket years ago (almost four years to the week) based on some recent work that I had to do with migrating some older code to use the Unified Sink APIs. I figured that while it was fresh in my mind, that I'd take a stab at generalizing it.

I'd love some eyes and feedback on it as I'm sure there's quite a bit of room for improvement.

Thanks!

Rion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants