Skip to content

Commit

Permalink
Add Kafka Streams data-flow engine (#119)
Browse files Browse the repository at this point in the history
* Add README for dev setup for Kafka Streams data-flow subproject

This covers the developer setup for Kotlin, including a couple of potential
issues one might encounter, and some of the different options available.

* Add auto-generated gitignore

* Add initial project state

Auto-generated by IntelliJ.
Java and Kotlin versions bumped to desired values.

* Add config provider from CLI args and env vars

* Use lazy val for CLI config

This avoids reevaluation of config, as this should be static once a
real-world, containerised program has started.
This provides a 'safer' interface for users.

* Make CLI-internal fields private

* Refactor env var prefix to const val

This may simplify testing.
The value may not be private in the future, to aid that purpose.

* Rename file to meet Kotlin coding conventions

https://kotlinlang.org/docs/coding-conventions.html#source-file-names

* Add basic logging config

* Refactor source files to fully-qualified package path

* Add logging for unrecognised command-line args

* Fix Gradle module & main class definitions

* Change default log sink to STDOUT

* Add logger configuration to main method

The 'sleep()' call is a hack to allow log messages to be written,
until we have some actual business logic defined.
This is because 'klogging' uses async logging, and the process
ends too quickly otherwise.

* Only warn of unrecognised CLI args if any exist

* Get CLI config in main method

* Add Kafka Streams & Protobuf dependencies

* Add TODOs for Kafka Streams handling

* Add draft of chainer protos & generated Kotlin files

* Regenerate Kotlin protos + include gRPC generated files

* Add symlink to Kotlin protos in data-flow subproject

* Add protoc/gRPC section to README

* Add additional steps on gRPC protobuf generation in README

Add download links for protoc plugins for Java & Kotlin.
Add example command for running 'protoc' with plugins.

* Add proto, gRPC, and coroutines deps for Gradle

* Update generated proto & gRPC stubs for chainer/data-flows

* Add stub for pipeline subscriber (client to scheduler)

* Add WIP stub impl. for Kafka Streams transformers

* Add inference v2 protobuf definitions

Copied directly from KServe:
https://github.com/kserve/kserve/blob/d1c24cd127523af1cc74d4e1536a816584699411/docs/predict-api/v2/grpc_predict_v2.proto

This is the same approach taken by Metronome.

* Add generated Kotlin/Java files for inference v2 protos

* Add Kafka Streams Protobuf serdes dependency to build.gradle

* Add local Maven cache as repo in build.gradle

* Regenerate chainer protos fresh

* Add inference V2 resp -> req conversion logic in chainer

* Remove meaningless fields from chainer resp -> req conversion

The name and version of a model do not apply to the next model in the pipeline.
The Kafka agent/worker knows how to set these fields anyway, so we can be efficient
by not duplicating work needlessly.

* Refactor chainer transformation idiomatically

* Rename method to reduce verbosity

* Implement init function for chainer + rename vars for clarity

* Add About section to data-flows README

Explain what this subproject aims to achieve and why that is useful.

* Group dependencies in build.gradle

* Launch chainer KStream topologies from subscription updates as required

* Update values expected by CLI

* Create and start subscriber in main thread

The coroutine scope handling is an MVP and should be improved."

* Add Netty as gRPC server runtime dependency in build.gradle

* Refactor CLI to object with more obvious API

The use of a lazy val in the CLI made accesses of config look like the constructor
was also an initialiser, implying the possibility of failure.
It was also unnecessary to create a new CLI instance each time when the logic is
really about config creation rather than the CLI itself.

* Add WIP stub chainer server to allow testing without real scheduler

* Add basic proxy chainer with pre-canned responses

* Add chainer config vars for proxy scheduler

* Start chainer server in proxy's main function

* Fix build errors with proxy chainer

* Fix Java source sets not being included

The generated Protobuf files are a mixture of Java and Kotlin in the same directories.
These directories are symlinked under 'src/main/kotlin', which breaks the Gradle
convention and leads it to believe there are no Java files to compile.
We need to help it find those Java files by specifying the source set.

* Add Kotlin compiler option to silence opt-in warnings from generated Protobuf files

* Add TODO for missing info in chainer protos

* Add MVP implementation of scheduler/chainer subscriber

* Update chainer subscriber to match new protos with multiple steps per update message

* Log out update messages for debugging

* Create all chainers before spawning each

* Drop duplicate pipeline requests

* Remove done TODO

* Remove now-superfluous sleep from main method

* Optimise imports

* Remove flowOn & launchIn calls on main flow

The change to the Dispatchers.IO context is unnecessary,
as the call to start a Kafka Stream is non-blocking,
thus the work of the flow is more CPU-bound than anything else.
The launch of the flow in a different scope is downright harmful,
as it moves the flow from the scope of the runBlocking call in 'main',
so 'main' finishes its work (starting a new scope) very quickly and returns,
thereby exiting the entire program before the new scope has a chance to run.

* Send duplicated calls from proxy scheduler/chainer

This tests that the pipeline subscriber/client can handle:
* Multiple pipeline requests, concurrently
* Duplicated requests (by ignoring them)
* Multiple steps per request, with K-Streams topologies for each

* Refactor pipeline subscriber to switch on operation type

* Move TODO comments for legibility

* Add start method to Transformer interface

* Refactor pipeline subscriber to push per-op logic to handler funcs

* Add TODO re avoiding duplication of streams

* Refactor transformer creation into factory method

* Refactor transformers to multiple files

* Close K-Streams topologies when subscription ends

* Remove pipeline closing on subscription termination

If the scheduler goes away, we should continue to run but retry connecting
to it until a new one is available.

* Add shutdown hook to cancel pipelines

* Await sync termination of K-Streams, in parallel across streams

By using coroutines to await the individual K-Streams terminating/joining,
we can run those terminations synchronously.
We thereby give the K-Streams a chance to cleanly disconnect from Kafka.

* Add basic (manually-checked) flow concurrency tests

This confirms the behaviour of flows with regard to concurrency and how to achieve it.

* Refactor coroutine parallel awaiting into type extension

* Send pipeline update events back to scheduler on shutdown

* Refactor pipeline update event creation to private method

* Add retry config for pipeline subscription

* Remove unused coroutine scope param for pipeline subscriber

* Tidy up imports

* Update Protobuf-generated Kotlin/Java files

* Remove (hopefully) done TODO

* Formatting

* Removed unused coroutine scope creation

* Add config option for num cores available to application

* Set num threads as double num cores available to app

This assumes threads will be blocked semi-frequently, so we want to
provide more schedulable threads to continue processing messages.

* Remove unused import

* Add link to example gRPC retry config

* Use coloured plaintext logging output

Whilst JSON-formatted logs are nice for parsing, more human-readable and coloured
logs tend to be convenient for, well, humans.
We continue to have custom context fields supported in this format.

* Add symlink to inference v2 generated files

* Change gRPC config values to strings to satisfy primitive type requirements

The gRPC parsing logic expects Java primitives, but the compiled types are 'java.lang.Integer'.
Instead, we can use their string equivalents, which are apparently parsed happily.

* Update expected source/sink topic format

* Rename constructor param for clarity

* Remove explicit gRPC max retry attempts as provided by config

* Move Kotlin subproject under scheduler top-level project

* Add Log4J as SLF4J impl for Kafka lib

* Add logger for chainers

* Use manual mapping to/from Protobuf types

The 'KafkaProtobufSerde' implementation _requires_ the use of the Kafka schema registry.
This is in spite of having the Proto definitions compiled into the application.
To avoid this inconvenience, we can manually map the Proto types to and from byte arrays,
although this likely has some level of performance overhead with allocations and GC.

* Import specific Proto types for convenience & legibility

* Use plaintext connection to Kafka

This should be upgraded to TLS ('SSL') in future, but for local testing this is
the expedient solution.

* Add TODO for filtering per chainer

* Whitespace

* Update coroutine/flow investigation tests

* Use consistent source/sink topics in proxy scheduler as elsewhere

* Add Jupyter notebook for data-flows MVP testing

MVP = scheduler (proxy) provides control info, data-flow transformer processes it, data successfully propagates from one topic to another.

* Add Go Kafka producer for testing data-flow engine

* Replace log4j with logback

* Run local Kafka in foreground not daemon mode

* Add dev/test settings for Kafka to speed up writes

* Add handling for deletion of existing pipeline

* Add params for making pipeline update events

* Remove done TODOs

* Formatting

* Add function to parse source into topic & tensor name

* Update data-flow notebook to better highlight operations

* Migrate to same inference v2 Protobuf contracts used by Go

Rather than using the KServe definitions, instead use those from MLServer.
This provides a slightly simpler definition, without raw payload contents.

* Update JVM package for inference v2 Protobuf contract

* Remove KServe-Protobuf generated files

* Add symlink to inference v2 generated Protobuf files

* Refactor publishing in Kafka producer script + use 2 topics

* Use klogging SLF4J interface for Apache packages

Configure Apache/Kafka to only log at or above warnings,
as Kafka output is highly verbose at 'info' level.

* Add log when shutting down

* Propagate pipeline cancellation reasons

* Improve logging

* Reinstate multiple steps in pre-canned proxy scheduler pipeline response

* Add logging fields for pipeline update events in proxy scheduler

* Update notebook for 2 pipeline steps with independent inputs & outputs

* Clear Kafka Streams state store on startup

This is a convenient setting for local testing, but might need to be
removed for production environments to allow for faster startup.

* Add TODO for Streams/Kafka handling

* Add app ID (consumer group) override per transformer

It seems that Kafka Streams allows only a single topology to be processed per app ID.
As each step in a pipeline will almost always constitute its own topology
(due to steps being disjoint because of models/processors in between them),
that would be a severe limitation on our ability to run pipelines.
Thus, we need to construct a per-step (topology) app ID to work around this limitation.

This implementation is hacky: the division of concerns feels wrong, and more importantly
the app ID is keyed by sink, which is almost certainly not unique.

* Refactor & improve Streams app Id handling

Move logic of overriding Kafka properties into an extension method.
Create a (hopefully) unique name based off operation type & topics involved.

* Add pipeline-header filtering for chainers

This will likely be refactored to a general, reusable component,
but first needs to be tested.

* Propagate pipeline name for filtering

* Filter out null values after header filtering

* Add TODO comment for tensor propagation

* Add pipeline headers in producer script

* Fix string-decoding bug in header filter

* Add CLI args for producer script

This allows information to be passed in, rather than duplicating knowledge of, for example, topic names
and Kafka connection details.
There are sensible defaults where possible.

* Support writing both expected & unexpected pipeline headers in producer

This allows validation that messages with unexpected/unwanted headers are ignored correctly.

* Add TODO for supporting more than inference v2 Protobuf data

* Increase threads per core to support more pipelines

* Refactor header filter to separate file

This filter is not specific to chainers so should not live in the same location.

* Use type aliases for Kafka key & value types

* Add logic & tests for identifying if tensors are from same model/input topic

* Parse sources as topics/tensors rather than validating if tensors are a subset projection

* Add function to parse a list of sources into topics & tensors

* Refactor source-parsing to use when statement

* Add sealed class for types of source projections

* Refactor source-parsing function for simplicity

* Add empty case for source-parsing logic

* Simplify transformer factory using source projection types

* Add JUnit Jupiter dependency to Gradle build

* Tidy up transformer test file

* Add basic test for transformer factory - no sources should throw excepion

* Migrate tensor-parsing tests to Strikt assertion library

* Migrate transformer factory tests to Strikt assertion library

* Simplify transformer factory test

* Move assertion extension to outside test class

* Refactor stream topology creation to lazy delegate for chainers

* Improve transformer factory test + add helper method

* Add test cases for transformer factory test

* Use Joiner for transformer factory when multiple sources are present

* Add TODO for better testing

* Refactor default test values + add equality comparion for transformers

This custom equality code allows us to selectively include/exclude fields.
For example, we might not want to compare Kafka properties as these are not an integral part of a transformer, but rather an implementation detail to be propagated.

* Add factory method for test chainers

* Add equality case for Joiners in test helper method

* Increase visibility of chainer fields for testing

* Filter chained tensors if subset thereof is specified

* Move nullable-tensors method to separate file

* Formatting

* Use string for Kafka topic keys

* Add second tensor in producer script

This allows for projecting specific tensors in the pipeline subscription.
Also fix incorrect tensor shapes.

* Project specific tensor for second input topic

This allows testing that the data-flow engine can handle specific tensors as well as entire input messages.

* Refactor main method & arg-parsing in producer script to main file

Also refactor hard-coded strings to package-level constants.

* Fix incorrect type for model version in Kafka scripts

* Rename var for stylistic consistency

* Move inference v2 message creation from producer script

The intent is to move the inference v2 handling into a centralised location
for both produced and expected (consumed) messages to make comparisons easier.

* Add function to parse consumed messages as inference v2 responses

* Fix calls to moved functions as no longer methods

* Add args for output topics as well as input ones

* Add consumer Go script

The intent of this is to ensure the transformed messages on the output topic(s)
are in the expected format and (in the future) have the expected values.

* Add TODO for (local/dev) default CLI args

* Send pipeline update event when any steps cannot be created

* Add properties for local/dev default config in data-flow engine

* Rename CLI arg for better namespacing

* Ad function to generate expected inference v2 response messages

* Fix incorrect type in Go testing script

* Rename value to avoid shadowing type

* Rename consumer constructor/initialiser to avoid confusion

* Add close method to Go consumer

* Refactor number of produced messages per topic to package level const

* Run consumer in main Go script

For use in a notebook, it may be better to split the producer and consumer again.
Instead, the argument-parsing and boilerplate logic could be moved to reusable functions.

* Add consumer group ID to Go consumer script

* Fix missing loop increment in consumer script

* Add errors to logs in producer/consumer Go scripts

* Fix erroneously swapped request/response types

The data-flow engine should be reading responses from one model and writing requests.
However, the Go scripts were getting these the wrong way round.
The fields in use in the messages were compatible, hence this not throwing errors on parsing,
but were confusing and did not represent the intended behaviour.

* Rename tensors in Go scripts for clarity

* Rename input & output topics in chainer of proxy scheduler

* Update data-flows notebook

Rename topics.
Add appropriate arguments for Go scripts.
Re-run cells.
Add informational comments to explain expected behaviour.

* Add topic name to consumer script logs

* Add TODO in chainer of proxy scheduler

* Remove debugging logs from Kotlin header filter

* Regenerate chainer/pipeline Proto/gRPC JVM files

* Renegerate inference v2 Proto/gRPC JVM files

* Fix transformer factory tests to meet new, nullable result

* Rename test function to avoid IntelliJ complaints about platform compatibility

* Refactor single main method to be one per script

* Rename Go Kafka script to reflect refactoring of main method(s)

* Require only input or output topics for Go scripts but not both

* Add Make targets for Go scripts

* Update data-flow notebook for separate producer & consumer scripts

* Remove renamed, optional field in chainer of proxy scheduler

* Disable linting of multiple main methods in Go Kafka scripts

* Handle errors from chainer service in proxy scheduler
  • Loading branch information
agrski authored Mar 27, 2022
1 parent c90188a commit 34a30d1
Show file tree
Hide file tree
Showing 61 changed files with 36,293 additions and 3 deletions.
6 changes: 5 additions & 1 deletion apis/mlops/chainer/chainer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ message PipelineStepUpdate {
Unknown = 0;
Inner = 1;
}
// https://docs.google.com/document/d/1tX-uaOvngx1RpEyWEZ4EbEcU8D0OgYuRWVb2UAi85n4/edit
// Pipeline Resource example, e.g. transform.outputs.traffic
// seldon.<namespace>.<model name>.<inputs|outputs>.<tensor name>
repeated string sources = 1;
string sink = 2;
PipelineJoinType ty = 3;
Expand All @@ -35,6 +38,7 @@ message PipelineStepUpdate {
}

message PipelineUpdateStatusMessage {
// TODO - include `name` to identify transformer message comes from
PipelineUpdateMessage update = 1;
bool success = 2;
string reason = 3;
Expand All @@ -46,4 +50,4 @@ message PipelineUpdateStatusResponse {
service Chainer {
rpc SubscribePipelineUpdates(PipelineSubscriptionRequest) returns (stream PipelineUpdateMessage) {};
rpc PipelineUpdateEvent(PipelineUpdateStatusMessage) returns (PipelineUpdateStatusResponse) {};
}
}
343 changes: 343 additions & 0 deletions apis/mlops/chainer/kotlin/io/seldon/mlops/chainer/ChainerGrpc.java

Large diffs are not rendered by default.

Loading

0 comments on commit 34a30d1

Please sign in to comment.