Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embedded Kafka support in OpenWhisk Standalone mode #4628

Merged
merged 11 commits into from
Sep 19, 2019

Conversation

chetanmeh
Copy link
Member

@chetanmeh chetanmeh commented Sep 17, 2019

Adds support for launching Embedded Kafka and Kafka Drop UI

Description

This PR adds support for launching an embedded-kafka. This is mostly meant to enable local development for features which need Kafka support (like User Event or Activation Persister Service etc). In this mode an inprocess Kafka Server (v2.1.1 currently) and Zookeeper Server

$ ./gradlew :core:standalone:build
$ java -jar bin/openwhisk-standalone.jar --kafka

# Or just run from OpenWhisk home dir
$ ./gradlew :core:standalone:bootRun --args='--kafka'

Key aspects

  • Kafka version 2.1.1 is being used while we run with 0.11.0.1. Tried to use that but then it causes class compatability issue with newer version of Kafka client being used
  • This mode is also useful to simulate some of the integration test scenarios with Activation Persister Service
  • Kafka Server launched is supporting connection from both host and within Docker container based on kafka listeners explanation
  • Kafka and Zookeeper generated files are stored in ~/.openwhisk/standalone/server-3233/tmp and are removed upon restart

Once launched you can see the details of port used etc as below

[ 9092  ] localhost:9092 (kafka)
[ 9092  ] 192.168.65.2:9091 (kafka-docker)
[ 2181  ] Zookeeper (zookeeper)
[ 9000  ] http://localhost:9000 (whisk-kafka-drop-ui)

Kafka Ui - Kafdrop 3

This PR also adds support for launching an optional Kafdrop 3

java -jar openwhisk-standalone.jar --kafka --kafka-ui

This UI can help a developer to understand the structure of messages exchanged on various topics easily

image

Related issue and scope

  • I opened an issue to propose and discuss this change (#????)

My changes affect the following components

  • API
  • Controller
  • Message Bus (e.g., Kafka)
  • Loadbalancer
  • Invoker
  • Intrinsic actions (e.g., sequences, conductors)
  • Data stores (e.g., CouchDB)
  • Tests
  • Deployment
  • CLI
  • General tooling
  • Documentation

Types of changes

  • Bug fix (generally a non-breaking change which closes an issue).
  • Enhancement or new feature (adds new functionality).
  • Breaking change (a bug fix or enhancement which changes existing behavior).

Checklist:

  • I signed an Apache CLA.
  • I reviewed the style guides and followed the recommendations (Travis CI will check :).
  • I added tests to cover my changes.
  • My changes require further changes to the documentation.
  • I updated the documentation where necessary.

@codecov-io
Copy link

codecov-io commented Sep 17, 2019

Codecov Report

Merging #4628 into master will increase coverage by 35.54%.
The diff coverage is n/a.

Impacted file tree graph

@@             Coverage Diff             @@
##           master    #4628       +/-   ##
===========================================
+ Coverage   43.31%   78.85%   +35.54%     
===========================================
  Files         183      183               
  Lines        8305     8305               
  Branches      574      573        -1     
===========================================
+ Hits         3597     6549     +2952     
+ Misses       4708     1756     -2952
Impacted Files Coverage Δ
...openwhisk/common/tracing/OpenTracingProvider.scala 21.15% <0%> (+1.92%) ⬆️
...re/database/MultipleReadersSingleWriterCache.scala 98% <0%> (+2%) ⬆️
...apache/openwhisk/core/entitlement/Collection.scala 87.5% <0%> (+2.5%) ⬆️
.../org/apache/openwhisk/http/PoolingRestClient.scala 91.17% <0%> (+2.94%) ⬆️
...abase/cosmosdb/CosmosDBArtifactStoreProvider.scala 4% <0%> (+4%) ⬆️
...tainerpool/docker/DockerClientWithFileAccess.scala 96% <0%> (+4%) ⬆️
...whisk/connector/kafka/KafkaConsumerConnector.scala 57.74% <0%> (+4.22%) ⬆️
...enwhisk/core/loadBalancer/InvokerSupervision.scala 95.77% <0%> (+4.92%) ⬆️
...pache/openwhisk/core/entity/ConcurrencyLimit.scala 94.11% <0%> (+5.88%) ⬆️
...la/org/apache/openwhisk/core/entity/LogLimit.scala 94.11% <0%> (+5.88%) ⬆️
... and 123 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 06362ad...03b99e4. Read the comment docs.

@@ -29,6 +29,8 @@

<!-- Kafka -->
<logger name="org.apache.kafka" level="ERROR" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the existing definition for org.apache.kafka is for Kafka clients (consumer, producer)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes


//Below setting based on https://rmoff.net/2018/08/02/kafka-listeners-explained/
// We configure two listeners where one is used for host based application and other is used for docker based application
// to connect to Kafka server running on host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to add some more detail - for example, controller / invoker will use LISTENER_LOCAL since they run in the same JVM as the embedded Kafka and Kafka UI will run in a Docker container and use LISTENER_DOCKER.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Done

val kafkaPort = opt[Int](
descr = "Kafka port. If not specified then 9092 or some random free port (if 9092 is busy) would be used",
noshort = true,
required = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add a default = Some(9092) here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done on purpose. Port 9092 is more like a "preferred" port. If no explicit option is provided then system would try to start Kafka by default at 9092. However if that port is busy then a random port would be selected.

This is done to ensure that Standalone OpenWhisk needs minimum required free port (for now only 3233). Rest all services can be started at any port and then dependent service would be configured as per randomly selected port

If a user explicitly provides a port like --kafka-port 9010 then Kafka would be started at that port. If the port is busy then Kafka would fail to start. This support is mostly meant for test which would like to know where Kafka would start

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. Background of my proposal was that there is some duplication of the default ports. We have the port number 9092 for Kafka here AND in startKafka(). Wouldn't it make sense to define a val defaultKafkaPort = 9092 and use it to build the description string and in startKafka()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Done that now

descr = "Kafka port for use by docker based services. If not specified then 9091 or some random free port " +
"(if 9091 is busy) would be used",
noshort = true,
required = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add a default = Some(9091) here?

val zkPort = opt[Int](
descr = "Zookeeper port. If not specified then 2181 or some random free port (if 2181 is busy) would be used",
noshort = true,
required = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add a default = Some(2181) here?

val dir = new File(workDir, name)
FileUtils.forceMkdir(dir)
Directory(dir)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is embedded Kafka writing topic message logs to this work directory? If yes, do we clean up this data when shutting the standalone openwhisk down? If not, there is a chance that this directory grows large depending on Kafka retention policy and number of activations... If this data is only cleaned up on system reboot, the directory can grow large over time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the dir would be cleaned upon standalone jar restart (not system restart). For now not deleting the files upon exit

as: ActorSystem,
ec: ExecutionContext,
materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = {
val kafkaPort = getPort(conf.kafkaPort.toOption, 9092)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add a default value to the kafkaPort option, can probably solve this differently and not repeat the port number literal here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Done as you suggested by moving defaults to constant and reusing that across various places

@@ -93,6 +95,7 @@ trait StandaloneServerFixture extends TestSuite with BeforeAndAfterAll with Stre
manifestFile.foreach(FileUtils.deleteQuietly)
serverProcess.destroy()
}
FileUtils.forceDelete(new File(dataDirPath))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this answers my question from above regarding data deletion when the process exits...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least for tests...

Copy link
Member

@sven-lange-last sven-lange-last left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing this pull request which enables developers to verify Kafka related scenarios with standalone openwhisk. Looks good to me - although I have to admit that I didn't try to run these changes and I never used standalone openwhisk up to now. So I may have missed something...

Overall, everything looks reasonable - only minor comments.

@chetanmeh
Copy link
Member Author

Thanks Sven for the review!

@chetanmeh chetanmeh merged commit 94fccbc into apache:master Sep 19, 2019
Copy link
Member

@rabbah rabbah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't get to this sooner. LGTM.

java -jar openwhisk-standalone.jar --kafka
```

It also supports launching a Kafka UI based on [Kafdrop 3][6] which enables seeing the topics created and structure of messages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Kafdrop 3][6] -> is this supposed to link to two different places, "3" and "6"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No thats the actual name Kafdrop 3

```

By default the ui server would be accessible at http://localhost:9000. In case 9000 port is busy then a random port would
be selected. TO find out the port look for message in log like below (or grep for `whisk-kafka-drop-ui`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TO -> To

BillZong pushed a commit to BillZong/openwhisk that referenced this pull request Nov 18, 2019
Enable support for Embedded Kafka to enable Kafka based feature development. It also support launching Kafdrop 3 based Ui for visualizing Kafka related metadata.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants