diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt index bc01b3114b..425360d6a7 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt @@ -2,17 +2,20 @@ package io.seldon.dataflow import com.natpryce.konfig.* import io.klogging.noCoLogger +import org.apache.kafka.common.security.auth.SecurityProtocol object Cli { private const val envVarPrefix = "SELDON_" private val logger = noCoLogger(Cli::class) + // Seldon components val upstreamHost = Key("upstream.host", stringType) val upstreamPort = Key("upstream.port", intType) - val numCores = Key("cores.count", intType) - + // Kafka + private val supportedKafkaProtocols = arrayOf(SecurityProtocol.PLAINTEXT) val kafkaBootstrapServers = Key("kafka.bootstrap.servers", stringType) + val kafkaSecurityProtocol = Key("kafka.security.protocol", enumType(*supportedKafkaProtocols)) val kafkaPartitions = Key("kafka.partitions.default", intType) val kafkaReplicationFactor = Key("kafka.replication.factor", intType) val kafkaUseCleanState = Key("kafka.state.clean", booleanType) @@ -30,13 +33,14 @@ object Cli { val (config, unparsedArgs) = parseArgs( rawArgs, CommandLineOption(kafkaBootstrapServers), + CommandLineOption(kafkaSecurityProtocol), CommandLineOption(kafkaPartitions), CommandLineOption(kafkaReplicationFactor), CommandLineOption(kafkaUseCleanState), CommandLineOption(kafkaJoinWindowMillis), - CommandLineOption(numCores), CommandLineOption(upstreamHost), CommandLineOption(upstreamPort), + programName = "seldon-dataflow-engine", ) if (unparsedArgs.isNotEmpty()) { logUnknownArguments(unparsedArgs) diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt index 7aa1077541..1a44bf0093 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt @@ -3,6 +3,7 @@ package io.seldon.dataflow import io.klogging.noCoLogger import io.seldon.dataflow.kafka.KafkaDomainParams import io.seldon.dataflow.kafka.KafkaStreamsParams +import io.seldon.dataflow.kafka.getKafkaAdminProperties import io.seldon.dataflow.kafka.getKafkaProperties import kotlinx.coroutines.runBlocking @@ -16,12 +17,14 @@ object Main { val config = Cli.configWith(args) logger.info("initialised") - val kafkaProperties = getKafkaProperties( - KafkaStreamsParams( - bootstrapServers = config[Cli.kafkaBootstrapServers], - numCores = config[Cli.numCores], - ), + val kafkaStreamsParams = KafkaStreamsParams( + bootstrapServers = config[Cli.kafkaBootstrapServers], + securityProtocol = config[Cli.kafkaSecurityProtocol], + numPartitions = config[Cli.kafkaPartitions], + replicationFactor = config[Cli.kafkaReplicationFactor], ) + val kafkaProperties = getKafkaProperties(kafkaStreamsParams) + val kafkaAdminProperties = getKafkaAdminProperties(kafkaStreamsParams) val kafkaDomainParams = KafkaDomainParams( useCleanState = config[Cli.kafkaUseCleanState], joinWindowMillis = config[Cli.kafkaJoinWindowMillis], @@ -29,6 +32,8 @@ object Main { val subscriber = PipelineSubscriber( "seldon-dataflow-engine", kafkaProperties, + kafkaAdminProperties, + kafkaStreamsParams, kafkaDomainParams, config[Cli.upstreamHost], config[Cli.upstreamPort], diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt index 139fe1ae8c..d76ab0208f 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt @@ -30,12 +30,14 @@ import io.klogging.logger as coLogger class PipelineSubscriber( private val name: String, private val kafkaProperties: KafkaProperties, + kafkaAdminProperties: KafkaAdminProperties, + kafkaStreamsParams: KafkaStreamsParams, private val kafkaDomainParams: KafkaDomainParams, private val upstreamHost: String, private val upstreamPort: Int, grpcServiceConfig: Map, ) { - private val kafkaAdmin = KafkaAdmin(kafkaProperties) + private val kafkaAdmin = KafkaAdmin(kafkaAdminProperties, kafkaStreamsParams) private val channel = ManagedChannelBuilder .forAddress(upstreamHost, upstreamPort) .defaultServiceConfig(grpcServiceConfig) diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt index 12b81ae616..abfc3c1824 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Configuration.kt @@ -3,12 +3,15 @@ package io.seldon.dataflow.kafka import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.streams.StreamsConfig import java.util.* data class KafkaStreamsParams( val bootstrapServers: String, - val numCores: Int, + val securityProtocol: SecurityProtocol, + val numPartitions: Int, + val replicationFactor: Int, ) data class KafkaDomainParams( @@ -16,12 +19,19 @@ data class KafkaDomainParams( val joinWindowMillis: Long, ) -val KAFKA_MAX_MESSAGE_BYTES = 1_000_000_000 +const val KAFKA_MAX_MESSAGE_BYTES = 1_000_000_000 val kafkaTopicConfig = mapOf( TopicConfig.MAX_MESSAGE_BYTES_CONFIG to KAFKA_MAX_MESSAGE_BYTES.toString() ) +fun getKafkaAdminProperties(params: KafkaStreamsParams): KafkaAdminProperties { + return Properties().apply { + this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = params.bootstrapServers + this[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = params.securityProtocol.toString() + } +} + fun getKafkaProperties(params: KafkaStreamsParams): KafkaProperties { // See https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html @@ -32,9 +42,9 @@ fun getKafkaProperties(params: KafkaStreamsParams): KafkaProperties { this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = params.bootstrapServers this[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = "at_least_once" this[StreamsConfig.NUM_STREAM_THREADS_CONFIG] = 1 - this[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = "PLAINTEXT" + this[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = params.securityProtocol.toString() // Testing - this[StreamsConfig.REPLICATION_FACTOR_CONFIG] = 1 + this[StreamsConfig.REPLICATION_FACTOR_CONFIG] = params.replicationFactor this[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0 this[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 1 diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/KafkaAdmin.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/KafkaAdmin.kt index dfdf8f57c1..7dfdc14f40 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/KafkaAdmin.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/KafkaAdmin.kt @@ -1,20 +1,18 @@ package io.seldon.dataflow.kafka -import io.seldon.dataflow.parallel import io.seldon.mlops.chainer.ChainerOuterClass.PipelineStepUpdate -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.admin.Admin import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.KafkaFuture -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.TopicExistsException import java.util.concurrent.ExecutionException import io.klogging.logger as coLogger -class KafkaAdmin(kafkaProperties: KafkaProperties) { - private val adminClient = Admin.create(kafkaProperties) +class KafkaAdmin( + adminConfig: KafkaAdminProperties, + private val streamsConfig: KafkaStreamsParams, +) { + private val adminClient = Admin.create(adminConfig) suspend fun ensureTopicsExist( @@ -27,7 +25,13 @@ class KafkaAdmin(kafkaProperties: KafkaProperties) { .also { logger.info("Topics found are $it") } - .map { topicName -> NewTopic(topicName, 1, 1).configs(kafkaTopicConfig) } + .map { topicName -> + NewTopic( + topicName, + streamsConfig.numPartitions, + streamsConfig.replicationFactor.toShort(), + ).configs(kafkaTopicConfig) + } .run { adminClient.createTopics(this) } .values() .also { topicCreations -> diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Types.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Types.kt index 8ae6c9af34..8b98b9e95b 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Types.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/kafka/Types.kt @@ -7,6 +7,7 @@ import org.apache.kafka.streams.kstream.StreamJoined import java.util.* typealias KafkaProperties = Properties +typealias KafkaAdminProperties = Properties typealias TopicName = String typealias TensorName = String typealias RequestId = String diff --git a/scheduler/data-flow/src/main/resources/local.properties b/scheduler/data-flow/src/main/resources/local.properties index e9730c86a5..04c6448643 100644 --- a/scheduler/data-flow/src/main/resources/local.properties +++ b/scheduler/data-flow/src/main/resources/local.properties @@ -1,8 +1,8 @@ kafka.bootstrap.servers=localhost:9092 +kafka.security.protocol=PLAINTEXT kafka.partitions.default=1 kafka.replication.factor=1 kafka.state.clean=true kafka.join.window.millis=60000 upstream.host=0.0.0.0 upstream.port=9008 -cores.count=4 \ No newline at end of file