KafkaStreams
is the interface for managing and inspecting the execution environment of the processing topology of a Kafka Streams application.
Note
|
A KafkaStreams instance is also referred by the name streams client (esp. in logs).
|
A KafkaStreams
instance (process) can be started or closed (shut down). The current state is available using state.
KafkaStreams
is running when the state is either RUNNING
or REBALANCING
.
There could be many KafkaStreams
instances running simultaneously (e.g. as separate JVM processes each with its own StreamThreads). While a KafkaStreams
instance is running, it allows for inspecting streams metadata using allMetadata, allMetadataForStore, and metadataForKey methods.
A KafkaStreams
instance exposes monitoring metrics (incl. the Kafka Producer, Consumers, and AdminClient).
Only when in CREATED
state, a KafkaStreams
instance can be registered with StateRestoreListeners, StateListeners, and UncaughtExceptionHandlers.
Method | Description |
---|---|
Collection<StreamsMetadata> allMetadata() StreamsMetadatas for all instances in a multi-instance Kafka Streams application |
|
|
Collection<StreamsMetadata> allMetadataForStore(
String storeName) |
void cleanUp() Cleans up the local directory with state stores |
|
void close() // (1)
boolean close(Duration timeout)
Closes the |
|
|
Set<ThreadMetadata> localThreadsMetadata() |
|
StreamsMetadata metadataForKey(
String storeName,
K key,
Serializer<K> keySerializer)
StreamsMetadata metadataForKey(
String storeName,
K key,
StreamPartitioner<? super K, ?> partitioner) |
|
Map<MetricName, ? extends Metric> metrics() Kafka monitoring metrics of the |
void setGlobalStateRestoreListener(
StateRestoreListener globalStateRestoreListener) Registers a global StateRestoreListener |
|
|
void setStateListener(
KafkaStreams.StateListener listener) Registers a KafkaStreams.StateListener |
|
void setUncaughtExceptionHandler(
Thread.UncaughtExceptionHandler eh) Registers a java.lang.UncaughtExceptionHandler |
void start() Starts the |
|
|
State state() The current state of the |
|
T store(
String storeName,
QueryableStoreType<T> queryableStoreType) |
KafkaStreams
is simply a Kafka client that consumes messages from and produces the processing results to Kafka topics (abstracted as SourceNodes and SinkNodes, respectively).
Note
|
A Kafka Streams developer describes the processing logic using a Topology directly (that is a graph of processors) or indirectly through a StreamsBuilder that provides the high-level DSL to define transformations and build a stream processing topology. |
val topology: Topology = ...
val config: StreamsConfig = ...
import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, config)
Once created, KafkaStreams
is started up to start consuming, processing, and producing records (as described by a Topology).
ks.start
(only when in CREATED state) KafkaStreams
can be given a StateRestoreListener to be informed about the state-related events: onRestoreStart, onBatchRestored and onRestoreEnd (through DelegatingStateRestoreListener).
import org.apache.kafka.streams.processor.StateRestoreListener
val userRestoreListener: StateRestoreListener = ???
ks.setGlobalStateRestoreListener(userRestoreListener)
KafkaStreams
uses a InternalTopologyBuilder for the following:
-
Creating a StreamsMetadataState, StreamThreads and QueryableStoreProvider
-
Requesting a global processor topology (for a GlobalStreamThread)
KafkaStreams
uses stream-client [client.id] for the log prefix (with the clientId).
Tip
|
Enable Add the following line to
Refer to Application Logging Using log4j. |
void cleanUp()
cleanUp
simply requests StateDirectory to clean when KafkaStreams
is not running.
cleanUp
reports a IllegalStateException
when KafkaStreams
is running.
Cannot clean up while running.
void close() // (1)
synchronized boolean close(final long timeout, final TimeUnit timeUnit)
-
Calls
close(final long timeout, final TimeUnit timeUnit)
with 0 timeout
close
…FIXME
Important
|
Always execute close on a KafkaStreams instance even if you never call start to avoid resource leaks.
|
// public API
KafkaStreams(
final Topology topology,
final Properties props) // (1)
// public API (mostly for testing)
KafkaStreams(
final Topology topology,
final Properties props,
final KafkaClientSupplier clientSupplier) // (3)
KafkaStreams(
final Topology topology,
final Properties props,
final Time time) // (4)
// private/internal API
KafkaStreams(
final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) // (5)
KafkaStreams(
final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
final Time time) // (6)
-
Calls the internal
KafkaStreams
(5) with a new DefaultKafkaClientSupplier -
Calls the internal
KafkaStreams
(6) withSystemTime
KafkaStreams
takes the following to be created:
KafkaStreams
initializes the internal properties.
While being created, KafkaStreams
…FIXME
KafkaStreams
requests the input KafkaClientSupplier for a Kafka AdminClient (for the AdminClient configuration for the clientId).
boolean setRunningFromCreated()
setRunningFromCreated
…FIXME
Note
|
setRunningFromCreated is used exclusively when KafkaStreams is started.
|
synchronized void start()
throws IllegalStateException, StreamsException
start
starts the Topology (that in turn starts consuming, processing, and producing records).
Internally, start
prints out the following DEBUG message to the logs:
Starting Streams client
start
marks KafkaStreams as running (i.e. transitions from CREATED to RUNNING state and notifies StateListeners).
start
starts global stream thread if defined (which is when…FIXME)
start
starts stream threads.
start
schedules a thread that requests StateDirectory to cleanRemovedTasks every state.cleanup.delay.ms milliseconds.
You should see the following DEBUG message in the logs:
Started Streams client
In case the changing state to running fails, start
merely prints out the following ERROR message to the logs:
Already stopped, cannot re-start
void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener)
setGlobalStateRestoreListener
registers a StateRestoreListener (in a Kafka Streams application).
Internally, setGlobalStateRestoreListener
simply sets the globalStateRestoreListener internal property to be the input StateRestoreListener (only when in CREATED state).
setGlobalStateRestoreListener
throws a IllegalStateException
when not in CREATED state:
Can only set GlobalStateRestoreListener in CREATED state. Current state is: [state]
Collection<StreamsMetadata> allMetadata()
allMetadata
makes sure that KafkaStreams is running and requests the StreamsMetadataState for metadata.
void validateIsRunning()
validateIsRunning
throws a IllegalStateException
when KafkaStreams
is not running. Otherwise, validateIsRunning
does nothing.
KafkaStreams is not running. State is [state].
Note
|
validateIsRunning is used when KafkaStreams is requested to allMetadata, allMetadataForStore, metadataForKey, metadataForKey, store, and localThreadsMetadata.
|
Name | Description | ||
---|---|---|---|
|
Used for the following:
|
||
|
A user-defined global StateRestoreListener to be notified about the state-related events: onRestoreStart, onBatchRestored and onRestoreEnd (through DelegatingStateRestoreListener) Set using setGlobalStateRestoreListener method |
||
|
|
||
|
A single-threaded executor ( Used to schedule a periodic action that requests the StateDirectory to cleanRemovedTasks after and every state.cleanup.delay.ms milliseconds (and only when the state is Initialized when |
||
|
Kafka AdminClient (that allows for managing and inspecting topics, brokers, configurations and ACLs)
|
||
|
Client ID that is initialized when
|
||
|
|||
|
|||
|
|||
|
StreamsMetadataState (with the InternalTopologyBuilder and application.server configuration property)
Initialized when |
||
|
|