-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Embedded Kafka support in OpenWhisk Standalone mode #4628
Changes from 7 commits
808518b
f904835
60e7515
59b7990
1db71fa
351a9f4
4d45b09
69b1b6c
d921d53
df2f5f8
03b99e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,8 @@ | |
|
||
<!-- Kafka --> | ||
<logger name="org.apache.kafka" level="ERROR" /> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess the existing definition for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes |
||
<logger name="org.apache.zookeeper" level="ERROR" /> | ||
<logger name="kafka.server" level="ERROR" /> | ||
|
||
<root level="${logback.log.level:-INFO}"> | ||
<appender-ref ref="console" /> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.standalone | ||
|
||
import java.io.File | ||
|
||
import akka.actor.ActorSystem | ||
import akka.stream.ActorMaterializer | ||
import kafka.server.KafkaConfig | ||
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} | ||
import org.apache.commons.io.FileUtils | ||
import org.apache.openwhisk.common.{Logging, TransactionId} | ||
import org.apache.openwhisk.core.WhiskConfig | ||
import org.apache.openwhisk.core.WhiskConfig.kafkaHosts | ||
import org.apache.openwhisk.core.entity.ControllerInstanceId | ||
import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider} | ||
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd} | ||
|
||
import scala.concurrent.{ExecutionContext, Future} | ||
import scala.reflect.io.Directory | ||
import scala.util.Try | ||
|
||
class KafkaLauncher(docker: StandaloneDockerClient, | ||
kafkaPort: Int, | ||
kafkaDockerPort: Int, | ||
zkPort: Int, | ||
workDir: File, | ||
kafkaUi: Boolean)(implicit logging: Logging, | ||
ec: ExecutionContext, | ||
actorSystem: ActorSystem, | ||
materializer: ActorMaterializer, | ||
tid: TransactionId) { | ||
|
||
def run(): Future[Seq[ServiceContainer]] = { | ||
for { | ||
kafkaSvcs <- runKafka() | ||
uiSvcs <- if (kafkaUi) runKafkaUI() else Future.successful(Seq.empty[ServiceContainer]) | ||
} yield kafkaSvcs ++ uiSvcs | ||
} | ||
|
||
def runKafka(): Future[Seq[ServiceContainer]] = { | ||
|
||
//Below setting based on https://rmoff.net/2018/08/02/kafka-listeners-explained/ | ||
// We configure two listeners where one is used for host based application and other is used for docker based application | ||
// to connect to Kafka server running on host | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be helpful to add some more detail - for example, controller / invoker will use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Done |
||
val brokerProps = Map( | ||
KafkaConfig.ListenersProp -> s"LISTENER_LOCAL://localhost:$kafkaPort,LISTENER_DOCKER://localhost:$kafkaDockerPort", | ||
KafkaConfig.AdvertisedListenersProp -> s"LISTENER_LOCAL://localhost:$kafkaPort,LISTENER_DOCKER://${StandaloneDockerSupport | ||
.getLocalHostIp()}:$kafkaDockerPort", | ||
KafkaConfig.ListenerSecurityProtocolMapProp -> "LISTENER_LOCAL:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT", | ||
KafkaConfig.InterBrokerListenerNameProp -> "LISTENER_LOCAL") | ||
implicit val config: EmbeddedKafkaConfig = | ||
EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, customBrokerProperties = brokerProps) | ||
|
||
val t = Try { | ||
EmbeddedKafka.startZooKeeper(createDir("zookeeper")) | ||
EmbeddedKafka.startKafka(createDir("kafka")) | ||
} | ||
|
||
Future | ||
.fromTry(t) | ||
.map( | ||
_ => | ||
Seq( | ||
ServiceContainer(kafkaPort, s"localhost:$kafkaPort", "kafka"), | ||
ServiceContainer( | ||
kafkaDockerPort, | ||
s"${StandaloneDockerSupport.getLocalHostIp()}:$kafkaDockerPort", | ||
"kafka-docker"), | ||
ServiceContainer(zkPort, "Zookeeper", "zookeeper"))) | ||
} | ||
|
||
def runKafkaUI(): Future[Seq[ServiceContainer]] = { | ||
val hostIp = StandaloneDockerSupport.getLocalHostIp() | ||
val port = checkOrAllocatePort(9000) | ||
val env = Map( | ||
"ZOOKEEPER_CONNECT" -> s"$hostIp:$zkPort", | ||
"KAFKA_BROKERCONNECT" -> s"$hostIp:$kafkaDockerPort", | ||
"JVM_OPTS" -> "-Xms32M -Xmx64M", | ||
"SERVER_SERVLET_CONTEXTPATH" -> "/") | ||
|
||
logging.info(this, s"Starting Kafka Drop UI port: $port") | ||
val name = containerName("kafka-drop-ui") | ||
val params = Map("-p" -> Set(s"$port:9000")) | ||
val args = createRunCmd(name, env, params) | ||
|
||
val f = docker.runDetached("obsidiandynamics/kafdrop", args, true) | ||
f.map(_ => Seq(ServiceContainer(port, s"http://localhost:$port", name))) | ||
} | ||
|
||
private def createDir(name: String) = { | ||
val dir = new File(workDir, name) | ||
FileUtils.forceMkdir(dir) | ||
Directory(dir) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is embedded Kafka writing topic message logs to this work directory? If yes, do we clean up this data when shutting the standalone openwhisk down? If not, there is a chance that this directory grows large depending on Kafka retention policy and number of activations... If this data is only cleaned up on system reboot, the directory can grow large over time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently the dir would be cleaned upon standalone jar restart (not system restart). For now not deleting the files upon exit |
||
} | ||
|
||
object KafkaAwareLeanBalancer extends LoadBalancerProvider { | ||
override def requiredProperties: Map[String, String] = LeanBalancer.requiredProperties ++ kafkaHosts | ||
|
||
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)( | ||
implicit actorSystem: ActorSystem, | ||
logging: Logging, | ||
materializer: ActorMaterializer): LoadBalancer = LeanBalancer.instance(whiskConfig, instance) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ import org.apache.openwhisk.core.cli.WhiskAdmin | |
import org.apache.openwhisk.core.controller.Controller | ||
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} | ||
import org.apache.openwhisk.standalone.ColorOutput.clr | ||
import org.apache.openwhisk.standalone.StandaloneDockerSupport.checkOrAllocatePort | ||
import org.rogach.scallop.ScallopConf | ||
import pureconfig.loadConfigOrThrow | ||
|
||
|
@@ -63,6 +64,25 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { | |
val apiGwPort = opt[Int](descr = "Api Gateway Port", default = Some(3234), noshort = true) | ||
val dataDir = opt[File](descr = "Directory used for storage", default = Some(StandaloneOpenWhisk.defaultWorkDir)) | ||
|
||
val kafka = opt[Boolean](descr = "Enable embedded Kafka support", noshort = true) | ||
val kafkaUi = opt[Boolean](descr = "Enable Kafka UI", noshort = true) | ||
|
||
val kafkaPort = opt[Int]( | ||
descr = "Kafka port. If not specified then 9092 or some random free port (if 9092 is busy) would be used", | ||
noshort = true, | ||
required = false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is done on purpose. Port 9092 is more like a "preferred" port. If no explicit option is provided then system would try to start Kafka by default at 9092. However if that port is busy then a random port would be selected. This is done to ensure that Standalone OpenWhisk needs minimum required free port (for now only 3233). Rest all services can be started at any port and then dependent service would be configured as per randomly selected port If a user explicitly provides a port like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand. Background of my proposal was that there is some duplication of the default ports. We have the port number 9092 for Kafka here AND in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Done that now |
||
|
||
val kafkaDockerPort = opt[Int]( | ||
descr = "Kafka port for use by docker based services. If not specified then 9091 or some random free port " + | ||
"(if 9091 is busy) would be used", | ||
noshort = true, | ||
required = false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to add a |
||
|
||
val zkPort = opt[Int]( | ||
descr = "Zookeeper port. If not specified then 2181 or some random free port (if 2181 is busy) would be used", | ||
noshort = true, | ||
required = false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to add a |
||
|
||
verify() | ||
|
||
val colorEnabled = !disableColorLogging() | ||
|
@@ -148,8 +168,12 @@ object StandaloneOpenWhisk extends SLF4JLogging { | |
startApiGateway(conf, dockerClient, dockerSupport) | ||
} else (-1, Seq.empty) | ||
|
||
val (kafkaPort, kafkaSvcs) = if (conf.kafka()) { | ||
startKafka(workDir, dockerClient, conf, conf.kafkaUi()) | ||
} else (-1, Seq.empty) | ||
|
||
val couchSvcs = if (conf.couchdb()) Some(startCouchDb(dataDir, dockerClient)) else None | ||
val svcs = Seq(apiGwSvcs, couchSvcs.toList).flatten | ||
val svcs = Seq(apiGwSvcs, couchSvcs.toList, kafkaSvcs).flatten | ||
if (svcs.nonEmpty) { | ||
new ServiceInfoLogger(conf, svcs, dataDir).run() | ||
} | ||
|
@@ -380,7 +404,7 @@ object StandaloneOpenWhisk extends SLF4JLogging { | |
ec: ExecutionContext, | ||
materializer: ActorMaterializer): ServiceContainer = { | ||
implicit val tid: TransactionId = TransactionId(systemPrefix + "couchDB") | ||
val port = StandaloneDockerSupport.checkOrAllocatePort(5984) | ||
val port = checkOrAllocatePort(5984) | ||
val dbDataDir = new File(dataDir, "couchdb") | ||
FileUtils.forceMkdir(dbDataDir) | ||
val db = new CouchDBLauncher(dockerClient, port, dbDataDir) | ||
|
@@ -393,6 +417,42 @@ object StandaloneOpenWhisk extends SLF4JLogging { | |
Await.result(g, 5.minutes) | ||
} | ||
|
||
private def startKafka(workDir: File, dockerClient: StandaloneDockerClient, conf: Conf, kafkaUi: Boolean)( | ||
implicit logging: Logging, | ||
as: ActorSystem, | ||
ec: ExecutionContext, | ||
materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = { | ||
val kafkaPort = getPort(conf.kafkaPort.toOption, 9092) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we add a default value to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack. Done as you suggested by moving defaults to constant and reusing that across various places |
||
implicit val tid: TransactionId = TransactionId(systemPrefix + "kafka") | ||
val k = new KafkaLauncher( | ||
dockerClient, | ||
kafkaPort, | ||
getPort(conf.kafkaDockerPort.toOption, kafkaPort - 1), | ||
getPort(conf.zkPort.toOption, 2181), | ||
workDir, | ||
kafkaUi) | ||
|
||
val f = k.run() | ||
val g = f.andThen { | ||
case Success(_) => | ||
logging.info( | ||
this, | ||
s"Kafka started successfully at http://${StandaloneDockerSupport.getLocalHostName()}:$kafkaPort") | ||
case Failure(t) => | ||
logging.error(this, "Error starting Kafka" + t) | ||
} | ||
val services = Await.result(g, 5.minutes) | ||
|
||
setConfigProp(WhiskConfig.kafkaHostList, s"localhost:$kafkaPort") | ||
setSysProp("whisk.spi.MessagingProvider", "org.apache.openwhisk.connector.kafka.KafkaMessagingProvider") | ||
setSysProp("whisk.spi.LoadBalancerProvider", "org.apache.openwhisk.standalone.KafkaAwareLeanBalancer") | ||
(kafkaPort, services) | ||
} | ||
|
||
private def getPort(configured: Option[Int], preferred: Int): Int = { | ||
configured.getOrElse(checkOrAllocatePort(preferred)) | ||
} | ||
|
||
private def configureDevMode(): Unit = { | ||
setSysProp("whisk.docker.standalone.container-factory.pull-standard-images", "false") | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.standalone | ||
|
||
import common.WskProps | ||
import org.junit.runner.RunWith | ||
import org.scalatest.junit.JUnitRunner | ||
import system.basic.WskRestBasicTests | ||
|
||
@RunWith(classOf[JUnitRunner]) | ||
class StandaloneKafkaTests extends WskRestBasicTests with StandaloneServerFixture with StandaloneSanityTestSupport { | ||
override implicit val wskprops = WskProps().copy(apihost = serverUrl) | ||
|
||
override protected def supportedTests = Set("Wsk Action REST should invoke a blocking action and get only the result") | ||
|
||
override protected def extraArgs: Seq[String] = Seq("--kafka") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Kafdrop 3][6]
-> is this supposed to link to two different places, "3" and "6"?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No thats the actual name Kafdrop 3