Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embedded Kafka support in OpenWhisk Standalone mode #4628

Merged
merged 11 commits into from
Sep 19, 2019
57 changes: 45 additions & 12 deletions core/standalone/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,37 @@ $ java -jar openwhisk-standalone.jar -h
\ \ / \/ \___/| .__/ \___|_| |_|__/\__|_| |_|_|___/_|\_\
\___\/ tm |_|

--api-gw Enable API Gateway support
--api-gw-port <arg> Api Gateway Port
--clean Clean any existing state like database
-c, --config-file <arg> application.conf which overrides the default
standalone.conf
--couchdb Enable CouchDB support
-d, --data-dir <arg> Directory used for storage
--disable-color-logging Disables colored logging
-m, --manifest <arg> Manifest json defining the supported runtimes
-p, --port <arg> Server port
--api-gw Enable API Gateway support
--api-gw-port <arg> Api Gateway Port
--clean Clean any existing state like database
-c, --config-file <arg> application.conf which overrides the default
standalone.conf
--couchdb Enable CouchDB support
-d, --data-dir <arg> Directory used for storage
--dev-mode Developer mode speeds up the startup by
disabling preflight checks and avoiding
explicit pulls.
--disable-color-logging Disables colored logging
--kafka Enable embedded Kafka support
--kafka-docker-port <arg> Kafka port for use by docker based services.
If not specified then 9091 or some random
free port (if 9091 is busy) would be used
--kafka-port <arg> Kafka port. If not specified then 9092 or
some random free port (if 9092 is busy) would
be used
--kafka-ui Enable Kafka UI
-m, --manifest <arg> Manifest json defining the supported runtimes
-p, --port <arg> Server port
-v, --verbose
-h, --help Show help message
--version Show version of this program
--zk-port <arg> Zookeeper port. If not specified then 2181 or
some random free port (if 2181 is busy) would
be used
-h, --help Show help message
--version Show version of this program

OpenWhisk standalone server


```

Sections below would illustrate some of the supported options
Expand Down Expand Up @@ -204,7 +219,25 @@ Api Gateway mode can be enabled via `--api-gw` flag. In this mode upon launch a
would be launched on port `3234` (can be changed with `--api-gw-port`). In this mode you can make use of the
[api gateway][4] support.

#### Using Kafka

Standalone OpenWhisk supports launching an [embedded kafka][5]. This mode is mostly useful for developers working on OpenWhisk
implementation itself.

```
java -jar openwhisk-standalone.jar --kafka
```

It also supports launching a Kafka UI based on [Kafdrop 3][6] which enables seeing the topics created and structure of messages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Kafdrop 3][6] -> is this supposed to link to two different places, "3" and "6"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No thats the actual name Kafdrop 3

exchanged on those topics.

```
java -jar openwhisk-standalone.jar --kafka --kafka-ui
```

[1]: https://github.com/apache/incubator-openwhisk/blob/master/docs/cli.md
[2]: https://github.com/apache/incubator-openwhisk/blob/master/docs/samples.md
[3]: https://github.com/apache/incubator-openwhisk-apigateway
[4]: https://github.com/apache/incubator-openwhisk/blob/master/docs/apigateway.md
[5]: https://github.com/embeddedkafka/embedded-kafka
[6]: https://github.com/obsidiandynamics/kafdrop
5 changes: 5 additions & 0 deletions core/standalone/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ dependencies {
compile project(':tools:admin')
compile 'org.rogach:scallop_2.12:3.3.1'

//Tried with 0.16.0 has support for Kafka 0.11.0 https://github.com/embeddedkafka/embedded-kafka/tree/v0.16.0
//But that causes class compatability issue die to use of newer client version
compile ("io.github.embeddedkafka:embedded-kafka_2.12:2.1.1")
compile ("org.scala-lang:scala-reflect:${gradle.scala.version}")

testCompile 'junit:junit:4.11'
testCompile 'org.scalatest:scalatest_2.12:3.0.5'
}
Expand Down
2 changes: 2 additions & 0 deletions core/standalone/src/main/resources/logback-standalone.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

<!-- Kafka -->
<logger name="org.apache.kafka" level="ERROR" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the existing definition for org.apache.kafka is for Kafka clients (consumer, producer)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

<logger name="org.apache.zookeeper" level="ERROR" />
<logger name="kafka.server" level="ERROR" />

<root level="${logback.log.level:-INFO}">
<appender-ref ref="console" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.standalone

import java.io.File

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import kafka.server.KafkaConfig
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.commons.io.FileUtils
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.WhiskConfig.kafkaHosts
import org.apache.openwhisk.core.entity.ControllerInstanceId
import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider}
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd}

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.io.Directory
import scala.util.Try

class KafkaLauncher(docker: StandaloneDockerClient,
kafkaPort: Int,
kafkaDockerPort: Int,
zkPort: Int,
workDir: File,
kafkaUi: Boolean)(implicit logging: Logging,
ec: ExecutionContext,
actorSystem: ActorSystem,
materializer: ActorMaterializer,
tid: TransactionId) {

def run(): Future[Seq[ServiceContainer]] = {
for {
kafkaSvcs <- runKafka()
uiSvcs <- if (kafkaUi) runKafkaUI() else Future.successful(Seq.empty[ServiceContainer])
} yield kafkaSvcs ++ uiSvcs
}

def runKafka(): Future[Seq[ServiceContainer]] = {

//Below setting based on https://rmoff.net/2018/08/02/kafka-listeners-explained/
// We configure two listeners where one is used for host based application and other is used for docker based application
// to connect to Kafka server running on host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to add some more detail - for example, controller / invoker will use LISTENER_LOCAL since they run in the same JVM as the embedded Kafka and Kafka UI will run in a Docker container and use LISTENER_DOCKER.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Done

val brokerProps = Map(
KafkaConfig.ListenersProp -> s"LISTENER_LOCAL://localhost:$kafkaPort,LISTENER_DOCKER://localhost:$kafkaDockerPort",
KafkaConfig.AdvertisedListenersProp -> s"LISTENER_LOCAL://localhost:$kafkaPort,LISTENER_DOCKER://${StandaloneDockerSupport
.getLocalHostIp()}:$kafkaDockerPort",
KafkaConfig.ListenerSecurityProtocolMapProp -> "LISTENER_LOCAL:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT",
KafkaConfig.InterBrokerListenerNameProp -> "LISTENER_LOCAL")
implicit val config: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, customBrokerProperties = brokerProps)

val t = Try {
EmbeddedKafka.startZooKeeper(createDir("zookeeper"))
EmbeddedKafka.startKafka(createDir("kafka"))
}

Future
.fromTry(t)
.map(
_ =>
Seq(
ServiceContainer(kafkaPort, s"localhost:$kafkaPort", "kafka"),
ServiceContainer(
kafkaDockerPort,
s"${StandaloneDockerSupport.getLocalHostIp()}:$kafkaDockerPort",
"kafka-docker"),
ServiceContainer(zkPort, "Zookeeper", "zookeeper")))
}

def runKafkaUI(): Future[Seq[ServiceContainer]] = {
val hostIp = StandaloneDockerSupport.getLocalHostIp()
val port = checkOrAllocatePort(9000)
val env = Map(
"ZOOKEEPER_CONNECT" -> s"$hostIp:$zkPort",
"KAFKA_BROKERCONNECT" -> s"$hostIp:$kafkaDockerPort",
"JVM_OPTS" -> "-Xms32M -Xmx64M",
"SERVER_SERVLET_CONTEXTPATH" -> "/")

logging.info(this, s"Starting Kafka Drop UI port: $port")
val name = containerName("kafka-drop-ui")
val params = Map("-p" -> Set(s"$port:9000"))
val args = createRunCmd(name, env, params)

val f = docker.runDetached("obsidiandynamics/kafdrop", args, true)
f.map(_ => Seq(ServiceContainer(port, s"http://localhost:$port", name)))
}

private def createDir(name: String) = {
val dir = new File(workDir, name)
FileUtils.forceMkdir(dir)
Directory(dir)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is embedded Kafka writing topic message logs to this work directory? If yes, do we clean up this data when shutting the standalone openwhisk down? If not, there is a chance that this directory grows large depending on Kafka retention policy and number of activations... If this data is only cleaned up on system reboot, the directory can grow large over time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the dir would be cleaned upon standalone jar restart (not system restart). For now not deleting the files upon exit

}

object KafkaAwareLeanBalancer extends LoadBalancerProvider {
override def requiredProperties: Map[String, String] = LeanBalancer.requiredProperties ++ kafkaHosts

override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): LoadBalancer = LeanBalancer.instance(whiskConfig, instance)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.openwhisk.core.cli.WhiskAdmin
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.standalone.ColorOutput.clr
import org.apache.openwhisk.standalone.StandaloneDockerSupport.checkOrAllocatePort
import org.rogach.scallop.ScallopConf
import pureconfig.loadConfigOrThrow

Expand Down Expand Up @@ -63,6 +64,25 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val apiGwPort = opt[Int](descr = "Api Gateway Port", default = Some(3234), noshort = true)
val dataDir = opt[File](descr = "Directory used for storage", default = Some(StandaloneOpenWhisk.defaultWorkDir))

val kafka = opt[Boolean](descr = "Enable embedded Kafka support", noshort = true)
val kafkaUi = opt[Boolean](descr = "Enable Kafka UI", noshort = true)

val kafkaPort = opt[Int](
descr = "Kafka port. If not specified then 9092 or some random free port (if 9092 is busy) would be used",
noshort = true,
required = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add a default = Some(9092) here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done on purpose. Port 9092 is more like a "preferred" port. If no explicit option is provided then system would try to start Kafka by default at 9092. However if that port is busy then a random port would be selected.

This is done to ensure that Standalone OpenWhisk needs minimum required free port (for now only 3233). Rest all services can be started at any port and then dependent service would be configured as per randomly selected port

If a user explicitly provides a port like --kafka-port 9010 then Kafka would be started at that port. If the port is busy then Kafka would fail to start. This support is mostly meant for test which would like to know where Kafka would start

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. Background of my proposal was that there is some duplication of the default ports. We have the port number 9092 for Kafka here AND in startKafka(). Wouldn't it make sense to define a val defaultKafkaPort = 9092 and use it to build the description string and in startKafka()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Done that now


val kafkaDockerPort = opt[Int](
descr = "Kafka port for use by docker based services. If not specified then 9091 or some random free port " +
"(if 9091 is busy) would be used",
noshort = true,
required = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add a default = Some(9091) here?


val zkPort = opt[Int](
descr = "Zookeeper port. If not specified then 2181 or some random free port (if 2181 is busy) would be used",
noshort = true,
required = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add a default = Some(2181) here?


verify()

val colorEnabled = !disableColorLogging()
Expand Down Expand Up @@ -148,8 +168,12 @@ object StandaloneOpenWhisk extends SLF4JLogging {
startApiGateway(conf, dockerClient, dockerSupport)
} else (-1, Seq.empty)

val (kafkaPort, kafkaSvcs) = if (conf.kafka()) {
startKafka(workDir, dockerClient, conf, conf.kafkaUi())
} else (-1, Seq.empty)

val couchSvcs = if (conf.couchdb()) Some(startCouchDb(dataDir, dockerClient)) else None
val svcs = Seq(apiGwSvcs, couchSvcs.toList).flatten
val svcs = Seq(apiGwSvcs, couchSvcs.toList, kafkaSvcs).flatten
if (svcs.nonEmpty) {
new ServiceInfoLogger(conf, svcs, dataDir).run()
}
Expand Down Expand Up @@ -380,7 +404,7 @@ object StandaloneOpenWhisk extends SLF4JLogging {
ec: ExecutionContext,
materializer: ActorMaterializer): ServiceContainer = {
implicit val tid: TransactionId = TransactionId(systemPrefix + "couchDB")
val port = StandaloneDockerSupport.checkOrAllocatePort(5984)
val port = checkOrAllocatePort(5984)
val dbDataDir = new File(dataDir, "couchdb")
FileUtils.forceMkdir(dbDataDir)
val db = new CouchDBLauncher(dockerClient, port, dbDataDir)
Expand All @@ -393,6 +417,42 @@ object StandaloneOpenWhisk extends SLF4JLogging {
Await.result(g, 5.minutes)
}

private def startKafka(workDir: File, dockerClient: StandaloneDockerClient, conf: Conf, kafkaUi: Boolean)(
implicit logging: Logging,
as: ActorSystem,
ec: ExecutionContext,
materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = {
val kafkaPort = getPort(conf.kafkaPort.toOption, 9092)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add a default value to the kafkaPort option, can probably solve this differently and not repeat the port number literal here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Done as you suggested by moving defaults to constant and reusing that across various places

implicit val tid: TransactionId = TransactionId(systemPrefix + "kafka")
val k = new KafkaLauncher(
dockerClient,
kafkaPort,
getPort(conf.kafkaDockerPort.toOption, kafkaPort - 1),
getPort(conf.zkPort.toOption, 2181),
workDir,
kafkaUi)

val f = k.run()
val g = f.andThen {
case Success(_) =>
logging.info(
this,
s"Kafka started successfully at http://${StandaloneDockerSupport.getLocalHostName()}:$kafkaPort")
case Failure(t) =>
logging.error(this, "Error starting Kafka" + t)
}
val services = Await.result(g, 5.minutes)

setConfigProp(WhiskConfig.kafkaHostList, s"localhost:$kafkaPort")
setSysProp("whisk.spi.MessagingProvider", "org.apache.openwhisk.connector.kafka.KafkaMessagingProvider")
setSysProp("whisk.spi.LoadBalancerProvider", "org.apache.openwhisk.standalone.KafkaAwareLeanBalancer")
(kafkaPort, services)
}

private def getPort(configured: Option[Int], preferred: Int): Int = {
configured.getOrElse(checkOrAllocatePort(preferred))
}

private def configureDevMode(): Unit = {
setSysProp("whisk.docker.standalone.container-factory.pull-standard-images", "false")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,21 @@
package org.apache.openwhisk.standalone

import common.WskProps
import org.apache.commons.io.{FileUtils, FilenameUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Canceled, Outcome}
import system.basic.WskRestBasicTests

@RunWith(classOf[JUnitRunner])
class StandaloneCouchTests extends WskRestBasicTests with StandaloneServerFixture {
class StandaloneCouchTests extends WskRestBasicTests with StandaloneServerFixture with StandaloneSanityTestSupport {
override implicit val wskprops = WskProps().copy(apihost = serverUrl)

override protected def extraArgs: Seq[String] =
Seq("--couchdb", "--data-dir", FilenameUtils.concat(FileUtils.getTempDirectoryPath, "standalone"))
Seq("--couchdb")

override protected def extraVMArgs: Seq[String] = Seq("-Dwhisk.standalone.couchdb.volumes-enabled=false")

//This is more of a sanity test. So just run one of the test which trigger interaction with couchdb
//and skip running all other tests
private val supportedTests = Set("Wsk Action REST should create, update, get and list an action")

override def withFixture(test: NoArgTest): Outcome = {
if (supportedTests.contains(test.name)) {
super.withFixture(test)
} else {
Canceled()
}
}
override protected def supportedTests: Set[String] =
Set("Wsk Action REST should create, update, get and list an action")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.standalone

import common.WskProps
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import system.basic.WskRestBasicTests

@RunWith(classOf[JUnitRunner])
class StandaloneKafkaTests extends WskRestBasicTests with StandaloneServerFixture with StandaloneSanityTestSupport {
override implicit val wskprops = WskProps().copy(apihost = serverUrl)

override protected def supportedTests = Set("Wsk Action REST should invoke a blocking action and get only the result")

override protected def extraArgs: Seq[String] = Seq("--kafka")
}
Loading