Skip to content

Commit

Permalink
Modelgateway issue (#296)
Browse files Browse the repository at this point in the history
* small update to docs

* fix modelgateway responsiveness to topics

* ensure kafka auto reset in config for default yaml for k8s

* review comments
  • Loading branch information
ukclivecox authored Jun 18, 2022
1 parent a92bda9 commit 7f07a97
Show file tree
Hide file tree
Showing 15 changed files with 475 additions and 75 deletions.
2 changes: 1 addition & 1 deletion k8s/yaml/seldon-v2-components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ data:
"bootstrap.servers": "seldon-kafka-plain-bootstrap.kafka:9092",
"consumer":{
"session.timeout.ms":6000,
"auto.offset.reset":"latest",
"auto.offset.reset":"earliest",
"topic.metadata.refresh.interval.ms": 1000
},
"producer":{
Expand Down
95 changes: 95 additions & 0 deletions samples/pipeline-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
seldon model load -f ./models/tfsimple1.yaml
seldon model load -f ./models/tfsimple2.yaml
seldon model status tfsimple1 -w ModelAvailable | jq -M .
seldon model status tfsimple2 -w ModelAvailable | jq -M .
seldon pipeline load -f ./pipelines/tfsimples.yaml
seldon pipeline status tfsimples -w PipelineReady| jq -M .
seldon pipeline infer tfsimples '{"inputs":[{"name":"INPUT0","data":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","data":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],"datatype":"INT32","shape":[1,16]}]}'
seldon pipeline infer tfsimples --inference-mode grpc '{"model_name":"simple","inputs":[{"name":"INPUT0","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]}]}'
seldon pipeline unload tfsimples
seldon model unload tfsimple1
seldon model unload tfsimple2


seldon model load -f ./models/tfsimple1.yaml
seldon model load -f ./models/tfsimple2.yaml
seldon model load -f ./models/tfsimple3.yaml
seldon model status tfsimple1 -w ModelAvailable | jq -M .
seldon model status tfsimple2 -w ModelAvailable | jq -M .
seldon model status tfsimple3 -w ModelAvailable | jq -M .
seldon pipeline load -f ./pipelines/tfsimples-join.yaml
seldon pipeline status join -w PipelineReady | jq -M .
seldon pipeline infer join --inference-mode grpc '{"model_name":"simple","inputs":[{"name":"INPUT0","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]}]}'
seldon pipeline unload join
seldon model unload tfsimple1
seldon model unload tfsimple2
seldon model unload tfsimple3

seldon model load -f ./models/conditional.yaml
seldon model load -f ./models/add10.yaml
seldon model load -f ./models/mul10.yaml
seldon model status conditional -w ModelAvailable | jq -M .
seldon model status add10 -w ModelAvailable | jq -M .
seldon model status mul10 -w ModelAvailable | jq -M .
seldon pipeline load -f ./pipelines/conditional.yaml
seldon pipeline status tfsimple-conditional -w PipelineReady | jq -M .
seldon pipeline infer tfsimple-conditional --inference-mode grpc '{"model_name":"outlier","inputs":[{"name":"CHOICE","contents":{"int_contents":[0]},"datatype":"INT32","shape":[1]},{"name":"INPUT0","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]},{"name":"INPUT1","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]}]}'
seldon pipeline infer tfsimple-conditional --inference-mode grpc '{"model_name":"outlier","inputs":[{"name":"CHOICE","contents":{"int_contents":[1]},"datatype":"INT32","shape":[1]},{"name":"INPUT0","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]},{"name":"INPUT1","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]}]}'
seldon pipeline unload tfsimple-conditional
seldon model unload conditional
seldon model unload add10
seldon model unload mul10


seldon model load -f ./models/outlier-error.yaml
seldon model status outlier-error -w ModelAvailable | jq -M .
seldon pipeline load -f ./pipelines/error.yaml
seldon pipeline status error -w PipelineReady | jq -M .
seldon pipeline infer error --inference-mode grpc '{"model_name":"outlier","inputs":[{"name":"INPUT","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]}]}'
seldon pipeline infer error --inference-mode grpc '{"model_name":"outlier","inputs":[{"name":"INPUT","contents":{"fp32_contents":[100,2,3,4]},"datatype":"FP32","shape":[4]}]}'
seldon pipeline unload error
seldon model unload outlier-error


seldon model load -f ./models/tfsimple1.yaml
seldon model load -f ./models/tfsimple2.yaml
seldon model load -f ./models/tfsimple3.yaml
seldon model load -f ./models/check.yaml
seldon model status tfsimple1 -w ModelAvailable | jq -M .
seldon model status tfsimple2 -w ModelAvailable | jq -M .
seldon model status tfsimple3 -w ModelAvailable | jq -M .
seldon model status check -w ModelAvailable | jq -M .
seldon pipeline load -f ./pipelines/tfsimples-join-outlier.yaml
seldon pipeline status joincheck -w PipelineReady | jq -M .
seldon pipeline infer joincheck --inference-mode grpc '{"model_name":"simple","inputs":[{"name":"INPUT0","contents":{"int_contents":[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]},"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","contents":{"int_contents":[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]},"datatype":"INT32","shape":[1,16]}]}'
seldon pipeline infer joincheck --inference-mode grpc '{"model_name":"simple","inputs":[{"name":"INPUT0","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]}]}'
seldon pipeline unload joincheck
seldon model unload tfsimple1
seldon model unload tfsimple2
seldon model unload tfsimple3
seldon model unload check


seldon model load -f ./models/mul10.yaml
seldon model load -f ./models/add10.yaml
seldon model status mul10 -w ModelAvailable | jq -M .
seldon model status add10 -w ModelAvailable | jq -M .
seldon pipeline load -f ./pipelines/pipeline-inputs.yaml
seldon pipeline status pipeline-inputs -w PipelineReady | jq -M .
seldon pipeline infer pipeline-inputs --inference-mode grpc '{"model_name":"pipeline","inputs":[{"name":"INPUT0","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]},{"name":"INPUT1","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]}]}' | jq -M .
seldon pipeline unload pipeline-inputs
seldon model unload mul10
seldon model unload add10


seldon model load -f ./models/mul10.yaml
seldon model load -f ./models/add10.yaml
seldon model status mul10 -w ModelAvailable | jq -M .
seldon model status add10 -w ModelAvailable | jq -M .
seldon pipeline load -f ./pipelines/trigger-joins.yaml
seldon pipeline status trigger-joins -w PipelineReady | jq -M .
seldon pipeline infer trigger-joins --inference-mode grpc '{"model_name":"pipeline","inputs":[{"name":"ok1","contents":{"fp32_contents":[1]},"datatype":"FP32","shape":[1]},{"name":"INPUT","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]}]}'
seldon pipeline infer trigger-joins --inference-mode grpc '{"model_name":"pipeline","inputs":[{"name":"ok3","contents":{"fp32_contents":[1]},"datatype":"FP32","shape":[1]},{"name":"INPUT","contents":{"fp32_contents":[1,2,3,4]},"datatype":"FP32","shape":[4]}]}'
seldon pipeline unload trigger-joins
seldon model unload mul10
seldon model unload add10
24 changes: 14 additions & 10 deletions scheduler/cmd/modelgateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ func makeSignalHandler(logger *log.Logger, done chan<- bool) {
close(done)
}

func getNumberWorkers(logger *log.Logger) int {
valStr := os.Getenv(gateway.EnvVarNumWorkers)
func getEnVar(logger *log.Logger, key string, defaultValue int) int {
valStr := os.Getenv(key)
if valStr != "" {
val, err := strconv.ParseInt(valStr, 10, 64)
if err != nil {
logger.WithError(err).Fatalf("Failed to parse %s", gateway.EnvVarNumWorkers)
logger.WithError(err).Fatalf("Failed to parse %s", key)
}
logger.Infof("Setting number of workers to %d", val)
logger.Infof("Got %s = %d", key, val)
return int(val)
}
logger.Infof("Setting number of workers to default %d", gateway.DefaultNumWorkers)
return gateway.DefaultNumWorkers
logger.Infof("Returning default %s = %d", key, defaultValue)
return defaultValue
}

func main() {
Expand Down Expand Up @@ -129,11 +129,15 @@ func main() {
HttpPort: envoyPort,
GrpcPort: envoyPort,
}
kafkaConsumer, err := gateway.NewInferKafkaConsumer(logger, getNumberWorkers(logger), kafkaConfigMap, namespace, inferServerConfig, tracer)
if err != nil {
logger.WithError(err).Fatalf("Failed to create kafka consumer")
consumerConfig := gateway.ConsumerConfig{
KafkaConfig: kafkaConfigMap,
Namespace: namespace,
InferenceServerConfig: inferServerConfig,
TraceProvider: tracer,
NumWorkers: getEnVar(logger, gateway.EnvVarNumWorkers, gateway.DefaultNumWorkers),
}
go kafkaConsumer.Serve()
kafkaConsumer := gateway.NewConsumerManager(logger, &consumerConfig,
getEnVar(logger, gateway.EnvMaxModelsPerConsumer, gateway.DefaultMaxModelsPerConsumer))
defer kafkaConsumer.Stop()

kafkaSchedulerClient := gateway.NewKafkaSchedulerClient(logger, kafkaConsumer)
Expand Down
3 changes: 1 addition & 2 deletions scheduler/config/kafka-host.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
"bootstrap.servers":"0.0.0.0:9092",
"consumer":{
"session.timeout.ms":6000,
"auto.offset.reset":"latest",
"topic.metadata.refresh.interval.ms": 1000
"auto.offset.reset":"earliest"
},
"producer":{
"linger.ms":0,
Expand Down
3 changes: 1 addition & 2 deletions scheduler/config/kafka-internal.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
"bootstrap.servers":"kafka:9093",
"consumer":{
"session.timeout.ms":6000,
"auto.offset.reset":"latest",
"topic.metadata.refresh.interval.ms": 1000
"auto.offset.reset":"earliest"
},
"producer":{
"linger.ms":0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fun getKafkaProperties(params: KafkaStreamsParams): KafkaProperties {
this[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
this[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 1

this[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
this[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
this[ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG] = KAFKA_MAX_MESSAGE_BYTES
this[ConsumerConfig.FETCH_MAX_BYTES_CONFIG] = KAFKA_MAX_MESSAGE_BYTES
this[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = 60000
Expand Down
2 changes: 1 addition & 1 deletion scheduler/k8s/config/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
"bootstrap.servers": "seldon-kafka-plain-bootstrap.kafka:9092",
"consumer":{
"session.timeout.ms":6000,
"auto.offset.reset":"latest",
"auto.offset.reset":"earliest",
"topic.metadata.refresh.interval.ms": 1000
},
"producer":{
Expand Down
8 changes: 8 additions & 0 deletions scheduler/pkg/kafka/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,11 @@ func convertConfigMap(cm kafka.ConfigMap) (kafka.ConfigMap, error) {
}
return r, nil
}

// Allow us to test if we have a valid Kafka confguration. For unit tests we can have no bootstrap server
// See usages of this method.
// TODO in future allow testing to run without this check
func (kc KafkaConfig) HasKafkaBootstrapServer() bool {
bs := kc.Consumer[KafkaBootstrapServers]
return bs != nil && bs != ""
}
26 changes: 16 additions & 10 deletions scheduler/pkg/kafka/gateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ const (
)

type KafkaSchedulerClient struct {
logger logrus.FieldLogger
conn *grpc.ClientConn
callOptions []grpc.CallOption
inferKafkaConsumer *InferKafkaConsumer
logger logrus.FieldLogger
conn *grpc.ClientConn
callOptions []grpc.CallOption
consumerManager *ConsumerManager
}

func NewKafkaSchedulerClient(logger logrus.FieldLogger, inferkafkaConsumer *InferKafkaConsumer) *KafkaSchedulerClient {
func NewKafkaSchedulerClient(logger logrus.FieldLogger, consumerManager *ConsumerManager) *KafkaSchedulerClient {
opts := []grpc.CallOption{
grpc.MaxCallSendMsgSize(math.MaxInt32),
grpc.MaxCallRecvMsgSize(math.MaxInt32),
}

return &KafkaSchedulerClient{
logger: logger.WithField("source", "KafkaSchedulerClient"),
callOptions: opts,
inferKafkaConsumer: inferkafkaConsumer,
logger: logger.WithField("source", "KafkaSchedulerClient"),
callOptions: opts,
consumerManager: consumerManager,
}
}

Expand Down Expand Up @@ -105,10 +105,16 @@ func (kc *KafkaSchedulerClient) SubscribeModelEvents() error {
switch latestVersionStatus.State.State {
case scheduler.ModelStatus_ModelAvailable:
logger.Infof("Adding model %s", event.ModelName)
kc.inferKafkaConsumer.AddModel(event.ModelName)
err := kc.consumerManager.AddModel(event.ModelName)
if err != nil {
kc.logger.WithError(err).Errorf("Failed to add model %s", event.ModelName)
}
default:
logger.Infof("Removing model %s", event.ModelName)
kc.inferKafkaConsumer.RemoveModel(event.ModelName)
err := kc.consumerManager.RemoveModel(event.ModelName)
if err != nil {
kc.logger.WithError(err).Errorf("Failed to remove model %s", event.ModelName)
}
}

}
Expand Down
Loading

0 comments on commit 7f07a97

Please sign in to comment.