Skip to content

Commit

Permalink
Issue 445 scheduler model autoscale (#472)
Browse files Browse the repository at this point in the history
* fix small typo

* do not set default min replicas

* consider model loaded on replica when filtering

* add unit test

* add more tests

* add very simple crude model scaling logic

* refactor function

* lint fixes

* lint fixes

* add test

* reorder funcs

* add smoke test

* fix func name

* add scaling down test

* refactor

* lint

* simplify test

* set defaults if min / max replicas are missing

* add range checks

* tidy up scaling range logic

* more tidy up

* add more tests

* add cooling off period

* fix lint

* add range checks for replicas

* remove unused env variables

* remove  lock

* revert REQUESTS_CA_BUNDLE (#491)

* tidy up notebook with more models for triton (#494)

* Fix Strimzi Helm values ZK indent bug + stale broker service name (#492)

* Fix strimzi helm values bug for zookeeper persistent storage

* fix kafka bootstrap location to be internal SVC

* fix log message

* add env variables to runner

* tidy up comment

* switch log to debug

* do not update k8s meta if nil

* tidy up ,model name

* fix decrement bug

Co-authored-by: cliveseldon <cc@seldon.io>
  • Loading branch information
sakoush and ukclivecox authored Oct 14, 2022
1 parent feaf14d commit 316abe1
Show file tree
Hide file tree
Showing 19 changed files with 912 additions and 116 deletions.
22 changes: 17 additions & 5 deletions operator/apis/mlops/v1alpha1/model_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ type ExplainerSpec struct {
type ScalingSpec struct {
// Number of replicas - default 1
Replicas *int32 `json:"replicas,omitempty"`
// Min number of replicas - default equal to replicas
// Min number of replicas - default equal to 0
MinReplicas *int32 `json:"minReplicas,omitempty"`
// Max number of replicas - default equal to replicas
// Max number of replicas - default equal to 0
MaxReplicas *int32 `json:"maxReplicas,omitempty"`
}

Expand Down Expand Up @@ -192,21 +192,33 @@ func (m Model) AsSchedulerModel() (*scheduler.Model, error) {
md.ModelSpec.Requirements = append(md.ModelSpec.Requirements, *m.Spec.ModelType)
}
// Set Replicas
//TODO add min/max replicas
if m.Spec.Replicas != nil {
md.DeploymentSpec.Replicas = uint32(*m.Spec.Replicas)
} else {
md.DeploymentSpec.Replicas = 1
if m.Spec.MinReplicas != nil {
// set replicas to the min replicas if not set
md.DeploymentSpec.Replicas = uint32(*m.Spec.MinReplicas)
} else {
md.DeploymentSpec.Replicas = 1
}
}

if m.Spec.MinReplicas != nil {
md.DeploymentSpec.MinReplicas = uint32(*m.Spec.MinReplicas)
if md.DeploymentSpec.Replicas < md.DeploymentSpec.MinReplicas {
return nil, fmt.Errorf("Number of replicas %d should be >= min replicas %d", md.DeploymentSpec.Replicas, md.DeploymentSpec.MinReplicas)
}
} else {
md.DeploymentSpec.MinReplicas = 1
md.DeploymentSpec.MinReplicas = 0
}

if m.Spec.MaxReplicas != nil {
md.DeploymentSpec.MaxReplicas = uint32(*m.Spec.MaxReplicas)
if md.DeploymentSpec.Replicas > md.DeploymentSpec.MaxReplicas {
return nil, fmt.Errorf("Number of replicas %d should be <= max replicas %d", md.DeploymentSpec.Replicas, md.DeploymentSpec.MaxReplicas)
}
} else {
md.DeploymentSpec.MaxReplicas = 0
}

// Set memory bytes
Expand Down
148 changes: 145 additions & 3 deletions operator/apis/mlops/v1alpha1/model_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestAsModelDetails(t *testing.T) {
error bool
}
replicas := int32(4)
replicas1 := int32(1)
secret := "secret"
modelType := "sklearn"
server := "server"
Expand Down Expand Up @@ -53,7 +54,8 @@ func TestAsModelDetails(t *testing.T) {
},
DeploymentSpec: &scheduler.DeploymentSpec{
Replicas: 1,
MinReplicas: 1,
MinReplicas: 0,
MaxReplicas: 0,
},
},
},
Expand Down Expand Up @@ -122,7 +124,8 @@ func TestAsModelDetails(t *testing.T) {
DeploymentSpec: &scheduler.DeploymentSpec{
Replicas: 4,
LogPayloads: true,
MinReplicas: 1,
MinReplicas: 0,
MaxReplicas: 0,
},
},
},
Expand Down Expand Up @@ -155,10 +158,149 @@ func TestAsModelDetails(t *testing.T) {
},
DeploymentSpec: &scheduler.DeploymentSpec{
Replicas: 1,
MinReplicas: 1,
MinReplicas: 0,
MaxReplicas: 0,
},
},
},
{
name: "simple min replica",
model: &Model{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
ResourceVersion: "1",
Generation: 1,
},
Spec: ModelSpec{
InferenceArtifactSpec: InferenceArtifactSpec{
StorageURI: "gs://test",
},
ScalingSpec: ScalingSpec{MinReplicas: &replicas},
},
},
modelpb: &scheduler.Model{
Meta: &scheduler.MetaData{
Name: "foo",
KubernetesMeta: &scheduler.KubernetesMeta{
Namespace: "default",
Generation: 1,
},
},
ModelSpec: &scheduler.ModelSpec{
Uri: "gs://test",
},
DeploymentSpec: &scheduler.DeploymentSpec{
Replicas: 4,
MinReplicas: 4,
MaxReplicas: 0,
},
},
},
{
name: "simple max replica",
model: &Model{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
ResourceVersion: "1",
Generation: 1,
},
Spec: ModelSpec{
InferenceArtifactSpec: InferenceArtifactSpec{
StorageURI: "gs://test",
},
ScalingSpec: ScalingSpec{MaxReplicas: &replicas},
},
},
modelpb: &scheduler.Model{
Meta: &scheduler.MetaData{
Name: "foo",
KubernetesMeta: &scheduler.KubernetesMeta{
Namespace: "default",
Generation: 1,
},
},
ModelSpec: &scheduler.ModelSpec{
Uri: "gs://test",
},
DeploymentSpec: &scheduler.DeploymentSpec{
Replicas: 1,
MinReplicas: 0,
MaxReplicas: 4,
},
},
},
{
name: "range violation min",
model: &Model{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
ResourceVersion: "1",
Generation: 1,
},
Spec: ModelSpec{
InferenceArtifactSpec: InferenceArtifactSpec{
StorageURI: "gs://test",
},
ScalingSpec: ScalingSpec{MinReplicas: &replicas, Replicas: &replicas1},
},
},
modelpb: &scheduler.Model{
Meta: &scheduler.MetaData{
Name: "foo",
KubernetesMeta: &scheduler.KubernetesMeta{
Namespace: "default",
Generation: 1,
},
},
ModelSpec: &scheduler.ModelSpec{
Uri: "gs://test",
},
DeploymentSpec: &scheduler.DeploymentSpec{
Replicas: 1,
MinReplicas: 0,
MaxReplicas: 4,
},
},
error: true,
},
{
name: "range violation max",
model: &Model{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
ResourceVersion: "1",
Generation: 1,
},
Spec: ModelSpec{
InferenceArtifactSpec: InferenceArtifactSpec{
StorageURI: "gs://test",
},
ScalingSpec: ScalingSpec{Replicas: &replicas, MaxReplicas: &replicas1},
},
},
modelpb: &scheduler.Model{
Meta: &scheduler.MetaData{
Name: "foo",
KubernetesMeta: &scheduler.KubernetesMeta{
Namespace: "default",
Generation: 1,
},
},
ModelSpec: &scheduler.ModelSpec{
Uri: "gs://test",
},
DeploymentSpec: &scheduler.DeploymentSpec{
Replicas: 1,
MinReplicas: 0,
MaxReplicas: 4,
},
},
error: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions scheduler/all-internal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ services:
- SELDON_SERVER_TYPE=mlserver
- SELDON_SERVER_CAPABILITIES=mlserver,alibi-detect,alibi-explain,lightgbm,mlflow,python,sklearn,spark-mlib,xgboost
ports:
- "${AGENT_MLSERVER_HTTP_PORT}:${AGENT_MLSERVER_HTTP_PORT}"
- "${AGENT_MLSERVER_GRPC_PORT}:${AGENT_MLSERVER_GRPC_PORT}"
- "${SELDON_MLSERVER_REVERSE_PROXY_HTTP_PORT}:${SELDON_MLSERVER_REVERSE_PROXY_HTTP_PORT}"
- "${SELDON_MLSERVER_REVERSE_PROXY_GRPC_PORT}:${SELDON_MLSERVER_REVERSE_PROXY_GRPC_PORT}"
- "${AGENT_MLSERVER_METRICS_PORT}:${AGENT_MLSERVER_METRICS_PORT}"
Expand Down Expand Up @@ -72,8 +70,6 @@ services:
environment:
- SELDON_METRICS_PORT=${AGENT_TRITON_METRICS_PORT}
ports:
- "${AGENT_TRITON_HTTP_PORT}:${AGENT_TRITON_HTTP_PORT}"
- "${AGENT_TRITON_GRPC_PORT}:${AGENT_TRITON_GRPC_PORT}"
- "${SELDON_TRITON_REVERSE_PROXY_HTTP_PORT}:${SELDON_TRITON_REVERSE_PROXY_HTTP_PORT}"
- "${SELDON_TRITON_REVERSE_PROXY_GRPC_PORT}:${SELDON_TRITON_REVERSE_PROXY_GRPC_PORT}"
- "${AGENT_TRITON_METRICS_PORT}:${AGENT_TRITON_METRICS_PORT}"
Expand Down
1 change: 1 addition & 0 deletions scheduler/cmd/agent/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func maybeUpdateFromIntEnv(flag string, env string, param *int, tag string) {
}

log.Infof(
"Setting %s from env %s with value %d",
tag,
env,
int(valueFromEnv),
Expand Down
4 changes: 0 additions & 4 deletions scheduler/env.all
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
AGENT_MLSERVER_HTTP_PORT=8090
AGENT_MLSERVER_GRPC_PORT=8091
AGENT_MLSERVER_DEBUG_PORT=7777
AGENT_MLSERVER_METRICS_PORT=9006
RCLONE_MLSERVER_HTTP_PORT=5572
AGENT_TRITON_HTTP_PORT=8092
AGENT_TRITON_GRPC_PORT=8093
AGENT_TRITON_DEBUG_PORT=7778
AGENT_TRITON_METRICS_PORT=9007
RCLONE_TRITON_HTTP_PORT=5573
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/agent/modelscaling/model_lag.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (lagKeeper *lagKeeper) dec() {
// therfore we cannot use `AddUint32(&x, ^uint32(0))`.
// the for loop is essentially to make sure that no concurrent requests have
// changed the value of the `lag` while we are decrementing it.
old := lagKeeper.get()
for {
old := lagKeeper.get()
if old > 0 {
new := old - 1
swapped := atomic.CompareAndSwapUint32(&lagKeeper.lag, old, new)
Expand Down
103 changes: 103 additions & 0 deletions scheduler/pkg/agent/modelscaling/stats_analyser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package modelscaling

import (
"strconv"
"sync"
"testing"
"time"

Expand All @@ -15,6 +17,25 @@ const (
lastUsedThresholdSecondsDefault = 30
)

func scalingMetricsSetup(
wg *sync.WaitGroup, internalModelName string,
modelLagStats, modelLastUsedStats interfaces.ModelScalingStats) error {
err := modelLagStats.IncDefault(internalModelName)
wg.Done()
if err != nil {
return err
}
return modelLastUsedStats.IncDefault(internalModelName)
}

func scalingMetricsTearDown(wg *sync.WaitGroup, internalModelName string,
modelLagStats, modelLastUsedStats interfaces.ModelScalingStats, jobsWg *sync.WaitGroup) error {
wg.Wait() // make sure that Inc is called first
err := modelLagStats.DecDefault(internalModelName)
jobsWg.Done()
return err
}

func TestStatsAnalyserSmoke(t *testing.T) {
g := NewGomegaWithT(t)
dummyModelPrefix := "model_"
Expand Down Expand Up @@ -78,3 +99,85 @@ func TestStatsAnalyserSmoke(t *testing.T) {

t.Logf("Done!")
}

func TestStatsAnalyserSoak(t *testing.T) {
numberIterations := 1000
numberModels := 100

g := NewGomegaWithT(t)
dummyModelPrefix := "model_"

t.Logf("Start!")

lags := NewModelReplicaLagsKeeper()
lastUsed := NewModelReplicaLastUsedKeeper()
service := NewStatsAnalyserService(
[]ModelScalingStatsWrapper{
{
Stats: lags,
Operator: interfaces.Gte,
Threshold: lagThresholdDefault,
Reset: true,
EventType: ScaleUpEvent,
},
{
Stats: lastUsed,
Operator: interfaces.Gte,
Threshold: lastUsedThresholdSecondsDefault,
Reset: false,
EventType: ScaleDownEvent,
},
},
log.New(),
statsPeriodSecondsDefault,
)

err := service.Start()

time.Sleep(time.Millisecond * 100) // for the service to actually start

g.Expect(err).To(BeNil())
g.Expect(service.isReady).To(BeTrue())

for j := 0; j < numberModels; j++ {
err := service.AddModel(dummyModelPrefix + strconv.Itoa(j))
g.Expect(err).To(BeNil())
}

ch := service.GetEventChannel()

var jobsWg sync.WaitGroup
jobsWg.Add(numberIterations * numberModels)

for i := 0; i < numberIterations; i++ {
for j := 0; j < numberModels; j++ {
var wg sync.WaitGroup
wg.Add(1)
setupFn := func(x int) {
err := scalingMetricsSetup(&wg, dummyModelPrefix+strconv.Itoa(x), lags, lastUsed)
g.Expect(err).To(BeNil())
}
teardownFn := func(x int) {
err := scalingMetricsTearDown(&wg, dummyModelPrefix+strconv.Itoa(x), lags, lastUsed, &jobsWg)
g.Expect(err).To(BeNil())
}
go setupFn(j)
go teardownFn(j)
}
}
go func() {
// dump messages on the floor
<-ch
}()
jobsWg.Wait()

// delete
for j := 0; j < numberModels; j++ {
err := service.DeleteModel(dummyModelPrefix + strconv.Itoa(j))
g.Expect(err).To(BeNil())
}

_ = service.Stop()

t.Logf("Done!")
}
Loading

0 comments on commit 316abe1

Please sign in to comment.