Skip to content

Commit

Permalink
Tidy Kafka config handling (#407)
Browse files Browse the repository at this point in the history
* Remove unused imports from admin client wrapper

* Add function to create Kafka admin properties from provided config

* Use Kafka admin config for admin client

* Add CLI handling for Kafka security protocol

* Pass security protocol to Kafka config objects

* Add comments describing logical sections of CLI config

* Remove unused core-count config

* Update local properties file

* Use replication factor CLI value in Kafka config

* Expect admin properties not normal Kafka properties in admin client wrapper

* Rename var for consistency

* Use Kafka streams config to set replication factor when creating topics

* Use Kafka streams config to set partition count when creating topics

* Add const modifier to value

* Add program name to CLI config parsing setup
  • Loading branch information
agrski authored Aug 22, 2022
1 parent d3a6df7 commit 511ec06
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 22 deletions.
10 changes: 7 additions & 3 deletions scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package io.seldon.dataflow

import com.natpryce.konfig.*
import io.klogging.noCoLogger
import org.apache.kafka.common.security.auth.SecurityProtocol

object Cli {
private const val envVarPrefix = "SELDON_"
private val logger = noCoLogger(Cli::class)

// Seldon components
val upstreamHost = Key("upstream.host", stringType)
val upstreamPort = Key("upstream.port", intType)

val numCores = Key("cores.count", intType)

// Kafka
private val supportedKafkaProtocols = arrayOf(SecurityProtocol.PLAINTEXT)
val kafkaBootstrapServers = Key("kafka.bootstrap.servers", stringType)
val kafkaSecurityProtocol = Key("kafka.security.protocol", enumType(*supportedKafkaProtocols))
val kafkaPartitions = Key("kafka.partitions.default", intType)
val kafkaReplicationFactor = Key("kafka.replication.factor", intType)
val kafkaUseCleanState = Key("kafka.state.clean", booleanType)
Expand All @@ -30,13 +33,14 @@ object Cli {
val (config, unparsedArgs) = parseArgs(
rawArgs,
CommandLineOption(kafkaBootstrapServers),
CommandLineOption(kafkaSecurityProtocol),
CommandLineOption(kafkaPartitions),
CommandLineOption(kafkaReplicationFactor),
CommandLineOption(kafkaUseCleanState),
CommandLineOption(kafkaJoinWindowMillis),
CommandLineOption(numCores),
CommandLineOption(upstreamHost),
CommandLineOption(upstreamPort),
programName = "seldon-dataflow-engine",
)
if (unparsedArgs.isNotEmpty()) {
logUnknownArguments(unparsedArgs)
Expand Down
15 changes: 10 additions & 5 deletions scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.seldon.dataflow
import io.klogging.noCoLogger
import io.seldon.dataflow.kafka.KafkaDomainParams
import io.seldon.dataflow.kafka.KafkaStreamsParams
import io.seldon.dataflow.kafka.getKafkaAdminProperties
import io.seldon.dataflow.kafka.getKafkaProperties
import kotlinx.coroutines.runBlocking

Expand All @@ -16,19 +17,23 @@ object Main {
val config = Cli.configWith(args)
logger.info("initialised")

val kafkaProperties = getKafkaProperties(
KafkaStreamsParams(
bootstrapServers = config[Cli.kafkaBootstrapServers],
numCores = config[Cli.numCores],
),
val kafkaStreamsParams = KafkaStreamsParams(
bootstrapServers = config[Cli.kafkaBootstrapServers],
securityProtocol = config[Cli.kafkaSecurityProtocol],
numPartitions = config[Cli.kafkaPartitions],
replicationFactor = config[Cli.kafkaReplicationFactor],
)
val kafkaProperties = getKafkaProperties(kafkaStreamsParams)
val kafkaAdminProperties = getKafkaAdminProperties(kafkaStreamsParams)
val kafkaDomainParams = KafkaDomainParams(
useCleanState = config[Cli.kafkaUseCleanState],
joinWindowMillis = config[Cli.kafkaJoinWindowMillis],
)
val subscriber = PipelineSubscriber(
"seldon-dataflow-engine",
kafkaProperties,
kafkaAdminProperties,
kafkaStreamsParams,
kafkaDomainParams,
config[Cli.upstreamHost],
config[Cli.upstreamPort],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import io.klogging.logger as coLogger
class PipelineSubscriber(
private val name: String,
private val kafkaProperties: KafkaProperties,
kafkaAdminProperties: KafkaAdminProperties,
kafkaStreamsParams: KafkaStreamsParams,
private val kafkaDomainParams: KafkaDomainParams,
private val upstreamHost: String,
private val upstreamPort: Int,
grpcServiceConfig: Map<String, Any>,
) {
private val kafkaAdmin = KafkaAdmin(kafkaProperties)
private val kafkaAdmin = KafkaAdmin(kafkaAdminProperties, kafkaStreamsParams)
private val channel = ManagedChannelBuilder
.forAddress(upstreamHost, upstreamPort)
.defaultServiceConfig(grpcServiceConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,35 @@ package io.seldon.dataflow.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.streams.StreamsConfig
import java.util.*

data class KafkaStreamsParams(
val bootstrapServers: String,
val numCores: Int,
val securityProtocol: SecurityProtocol,
val numPartitions: Int,
val replicationFactor: Int,
)

data class KafkaDomainParams(
val useCleanState: Boolean,
val joinWindowMillis: Long,
)

val KAFKA_MAX_MESSAGE_BYTES = 1_000_000_000
const val KAFKA_MAX_MESSAGE_BYTES = 1_000_000_000

val kafkaTopicConfig = mapOf(
TopicConfig.MAX_MESSAGE_BYTES_CONFIG to KAFKA_MAX_MESSAGE_BYTES.toString()
)

fun getKafkaAdminProperties(params: KafkaStreamsParams): KafkaAdminProperties {
return Properties().apply {
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = params.bootstrapServers
this[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = params.securityProtocol.toString()
}
}

fun getKafkaProperties(params: KafkaStreamsParams): KafkaProperties {
// See https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html

Expand All @@ -32,9 +42,9 @@ fun getKafkaProperties(params: KafkaStreamsParams): KafkaProperties {
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = params.bootstrapServers
this[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = "at_least_once"
this[StreamsConfig.NUM_STREAM_THREADS_CONFIG] = 1
this[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = "PLAINTEXT"
this[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = params.securityProtocol.toString()
// Testing
this[StreamsConfig.REPLICATION_FACTOR_CONFIG] = 1
this[StreamsConfig.REPLICATION_FACTOR_CONFIG] = params.replicationFactor
this[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
this[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 1

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package io.seldon.dataflow.kafka

import io.seldon.dataflow.parallel
import io.seldon.mlops.chainer.ChainerOuterClass.PipelineStepUpdate
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.TopicExistsException
import java.util.concurrent.ExecutionException
import io.klogging.logger as coLogger

class KafkaAdmin(kafkaProperties: KafkaProperties) {
private val adminClient = Admin.create(kafkaProperties)
class KafkaAdmin(
adminConfig: KafkaAdminProperties,
private val streamsConfig: KafkaStreamsParams,
) {
private val adminClient = Admin.create(adminConfig)


suspend fun ensureTopicsExist(
Expand All @@ -27,7 +25,13 @@ class KafkaAdmin(kafkaProperties: KafkaProperties) {
.also {
logger.info("Topics found are $it")
}
.map { topicName -> NewTopic(topicName, 1, 1).configs(kafkaTopicConfig) }
.map { topicName ->
NewTopic(
topicName,
streamsConfig.numPartitions,
streamsConfig.replicationFactor.toShort(),
).configs(kafkaTopicConfig)
}
.run { adminClient.createTopics(this) }
.values()
.also { topicCreations ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.apache.kafka.streams.kstream.StreamJoined
import java.util.*

typealias KafkaProperties = Properties
typealias KafkaAdminProperties = Properties
typealias TopicName = String
typealias TensorName = String
typealias RequestId = String
Expand Down
2 changes: 1 addition & 1 deletion scheduler/data-flow/src/main/resources/local.properties
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
kafka.bootstrap.servers=localhost:9092
kafka.security.protocol=PLAINTEXT
kafka.partitions.default=1
kafka.replication.factor=1
kafka.state.clean=true
kafka.join.window.millis=60000
upstream.host=0.0.0.0
upstream.port=9008
cores.count=4

0 comments on commit 511ec06

Please sign in to comment.