-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Embedded Kafka support in OpenWhisk Standalone mode #4628
Conversation
Codecov Report
@@ Coverage Diff @@
## master #4628 +/- ##
===========================================
+ Coverage 43.31% 78.85% +35.54%
===========================================
Files 183 183
Lines 8305 8305
Branches 574 573 -1
===========================================
+ Hits 3597 6549 +2952
+ Misses 4708 1756 -2952
Continue to review full report at Codecov.
|
This is needed for tests to work with a known port prior to start
77178b2
to
4d45b09
Compare
@@ -29,6 +29,8 @@ | |||
|
|||
<!-- Kafka --> | |||
<logger name="org.apache.kafka" level="ERROR" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the existing definition for org.apache.kafka
is for Kafka clients (consumer, producer)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
|
||
//Below setting based on https://rmoff.net/2018/08/02/kafka-listeners-explained/ | ||
// We configure two listeners where one is used for host based application and other is used for docker based application | ||
// to connect to Kafka server running on host |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to add some more detail - for example, controller / invoker will use LISTENER_LOCAL
since they run in the same JVM as the embedded Kafka and Kafka UI will run in a Docker container and use LISTENER_DOCKER
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Done
val kafkaPort = opt[Int]( | ||
descr = "Kafka port. If not specified then 9092 or some random free port (if 9092 is busy) would be used", | ||
noshort = true, | ||
required = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add a default = Some(9092)
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done on purpose. Port 9092 is more like a "preferred" port. If no explicit option is provided then system would try to start Kafka by default at 9092. However if that port is busy then a random port would be selected.
This is done to ensure that Standalone OpenWhisk needs minimum required free port (for now only 3233). Rest all services can be started at any port and then dependent service would be configured as per randomly selected port
If a user explicitly provides a port like --kafka-port 9010
then Kafka would be started at that port. If the port is busy then Kafka would fail to start. This support is mostly meant for test which would like to know where Kafka would start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand. Background of my proposal was that there is some duplication of the default ports. We have the port number 9092 for Kafka here AND in startKafka()
. Wouldn't it make sense to define a val defaultKafkaPort = 9092
and use it to build the description string and in startKafka()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Done that now
descr = "Kafka port for use by docker based services. If not specified then 9091 or some random free port " + | ||
"(if 9091 is busy) would be used", | ||
noshort = true, | ||
required = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add a default = Some(9091)
here?
val zkPort = opt[Int]( | ||
descr = "Zookeeper port. If not specified then 2181 or some random free port (if 2181 is busy) would be used", | ||
noshort = true, | ||
required = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add a default = Some(2181)
here?
val dir = new File(workDir, name) | ||
FileUtils.forceMkdir(dir) | ||
Directory(dir) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is embedded Kafka writing topic message logs to this work directory? If yes, do we clean up this data when shutting the standalone openwhisk down? If not, there is a chance that this directory grows large depending on Kafka retention policy and number of activations... If this data is only cleaned up on system reboot, the directory can grow large over time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the dir would be cleaned upon standalone jar restart (not system restart). For now not deleting the files upon exit
as: ActorSystem, | ||
ec: ExecutionContext, | ||
materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = { | ||
val kafkaPort = getPort(conf.kafkaPort.toOption, 9092) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add a default value to the kafkaPort
option, can probably solve this differently and not repeat the port number literal here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Done as you suggested by moving defaults to constant and reusing that across various places
@@ -93,6 +95,7 @@ trait StandaloneServerFixture extends TestSuite with BeforeAndAfterAll with Stre | |||
manifestFile.foreach(FileUtils.deleteQuietly) | |||
serverProcess.destroy() | |||
} | |||
FileUtils.forceDelete(new File(dataDirPath)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this answers my question from above regarding data deletion when the process exits...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for tests...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for providing this pull request which enables developers to verify Kafka related scenarios with standalone openwhisk. Looks good to me - although I have to admit that I didn't try to run these changes and I never used standalone openwhisk up to now. So I may have missed something...
Overall, everything looks reasonable - only minor comments.
Thanks Sven for the review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't get to this sooner. LGTM.
java -jar openwhisk-standalone.jar --kafka | ||
``` | ||
|
||
It also supports launching a Kafka UI based on [Kafdrop 3][6] which enables seeing the topics created and structure of messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Kafdrop 3][6]
-> is this supposed to link to two different places, "3" and "6"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No thats the actual name Kafdrop 3
``` | ||
|
||
By default the ui server would be accessible at http://localhost:9000. In case 9000 port is busy then a random port would | ||
be selected. TO find out the port look for message in log like below (or grep for `whisk-kafka-drop-ui`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TO -> To
Enable support for Embedded Kafka to enable Kafka based feature development. It also support launching Kafdrop 3 based Ui for visualizing Kafka related metadata.
Adds support for launching Embedded Kafka and Kafka Drop UI
Description
This PR adds support for launching an embedded-kafka. This is mostly meant to enable local development for features which need Kafka support (like User Event or Activation Persister Service etc). In this mode an inprocess Kafka Server (v2.1.1 currently) and Zookeeper Server
Key aspects
~/.openwhisk/standalone/server-3233/tmp
and are removed upon restartOnce launched you can see the details of port used etc as below
Kafka Ui - Kafdrop 3
This PR also adds support for launching an optional Kafdrop 3
This UI can help a developer to understand the structure of messages exchanged on various topics easily
Related issue and scope
My changes affect the following components
Types of changes
Checklist: