Skip to content

Commit

Permalink
Sherif akoush/improve replica sorting (#280)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakoush authored Jun 9, 2022
1 parent 330aefb commit c0ca3fe
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 116 deletions.
8 changes: 0 additions & 8 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"flag"
"io/ioutil"
"math/rand"
"net/http"
"time"

"github.com/seldonio/seldon-core/scheduler/pkg/tracing"
Expand All @@ -46,9 +45,6 @@ import (
"github.com/seldonio/seldon-core/scheduler/pkg/envoy/server"
"github.com/seldonio/seldon-core/scheduler/pkg/scheduler"
log "github.com/sirupsen/logrus"

_ "net/http"
_ "net/http/pprof"
)

var (
Expand Down Expand Up @@ -107,10 +103,6 @@ func getNamespace() string {
}

func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()

logger := log.New()
flag.Parse()
logIntLevel, err := log.ParseLevel(logLevel)
Expand Down
23 changes: 13 additions & 10 deletions scheduler/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
)

type SimpleScheduler struct {
mu sync.RWMutex
store store.ModelStore
logger log.FieldLogger
muFailedModels sync.RWMutex
muSortAndUpdate sync.Mutex
store store.ModelStore
logger log.FieldLogger
SchedulerConfig
failedModels map[string]bool
}
Expand All @@ -34,7 +35,7 @@ func DefaultSchedulerConfig(store store.ModelStore) SchedulerConfig {
serverFilters: []ServerFilter{filters.SharingServerFilter{}, filters.DeletedServerFilter{}},
replicaFilters: []ReplicaFilter{filters.RequirementsReplicaFilter{}, filters.AvailableMemoryReplicaFilter{}},
serverSorts: []sorters.ServerSorter{},
replicaSorts: []sorters.ReplicaSorter{sorters.ReplicaIndexSorter{}, sorters.AvailableMemoryWhileLoadingSorter{Store: store}, sorters.ModelAlreadyLoadedSorter{}},
replicaSorts: []sorters.ReplicaSorter{sorters.ReplicaIndexSorter{}, sorters.AvailableMemorySorter{}, sorters.ModelAlreadyLoadedSorter{}},
}
}

Expand All @@ -54,17 +55,17 @@ func (s *SimpleScheduler) Schedule(modelKey string) error {
err := s.scheduleToServer(modelKey)
// Set model state using error
if err != nil {
s.mu.Lock()
defer s.mu.Unlock()
s.muFailedModels.Lock()
defer s.muFailedModels.Unlock()
s.failedModels[modelKey] = true
return err
}
return nil
}

func (s *SimpleScheduler) ScheduleFailedModels() ([]string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
s.muFailedModels.RLock()
defer s.muFailedModels.RUnlock()
var updatedModels []string
for modelName := range s.failedModels {
err := s.scheduleToServer(modelName)
Expand Down Expand Up @@ -127,11 +128,13 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
continue
}

// TODO: do we need a lock here? we could have many goroutines at sorting
// we need a lock here, we could have many goroutines at sorting
// without the store being reflected and hence storing on stale values
s.muSortAndUpdate.Lock()
s.sortReplicas(candidateReplicas)

err = s.store.UpdateLoadedModels(modelName, latestModel.GetVersion(), candidateServer.Name, candidateReplicas.ChosenReplicas[0:latestModel.DesiredReplicas()])
s.muSortAndUpdate.Unlock()

if err != nil {
logger.Warnf("Failed to update model replicas for model %s on server %s", modelName, candidateServer.Name)
} else {
Expand Down
36 changes: 5 additions & 31 deletions scheduler/pkg/scheduler/sorters/replicamemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,19 @@ package sorters

import (
"math"

"github.com/seldonio/seldon-core/scheduler/pkg/store"
)

// Rationale - put models on replicas with more memory, including the models that are currently loading
// note that we can double count here as available memory (returned by the agent) could include memory
// that is allocated while the model is being loaded.
type AvailableMemoryWhileLoadingSorter struct {
Store store.ModelStore
}
type AvailableMemorySorter struct{}

func (m AvailableMemoryWhileLoadingSorter) Name() string {
func (m AvailableMemorySorter) Name() string {
return "AvailableMemorySorter"
}

func (m AvailableMemoryWhileLoadingSorter) IsLess(i *CandidateReplica, j *CandidateReplica) bool {
iMem := math.Max(0, float64(i.Replica.GetAvailableMemory()-getTotalMemoryBytesForLoadingModels(i, m.Store)))
jMem := math.Max(0, float64(j.Replica.GetAvailableMemory()-getTotalMemoryBytesForLoadingModels(j, m.Store)))
func (m AvailableMemorySorter) IsLess(i *CandidateReplica, j *CandidateReplica) bool {
iMem := math.Max(0, float64(i.Replica.GetAvailableMemory()-i.Replica.GetReservedMemory()))
jMem := math.Max(0, float64(j.Replica.GetAvailableMemory()-j.Replica.GetReservedMemory()))
return iMem > jMem
}

func getTotalMemoryBytesForLoadingModels(r *CandidateReplica, s store.ModelStore) uint64 {
mem := uint64(0)
if s == nil {
return mem
}
models, err := s.GetModels()
if err != nil {
return mem
}
for _, model := range models {
for _, version := range model.Versions {
if version.Server() == r.Server.Name {
replicaState := version.GetModelReplicaState(r.Replica.GetReplicaIdx())
if replicaState.IsLoading() {
mem += version.GetRequiredMemory()
}
}
}
}
return mem
}
2 changes: 1 addition & 1 deletion scheduler/pkg/scheduler/sorters/replicamemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestReplicaMemorySort(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sorter := AvailableMemoryWhileLoadingSorter{}
sorter := AvailableMemorySorter{}
sort.SliceStable(test.replicas, func(i, j int) bool { return sorter.IsLess(test.replicas[i], test.replicas[j]) })
for idx, expected := range test.ordering {
g.Expect(test.replicas[idx].Replica.GetReplicaIdx()).To(Equal(expected))
Expand Down
22 changes: 20 additions & 2 deletions scheduler/pkg/store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (m *MemoryStore) updateLoadedModelsImpl(
modelVersion := model.Latest()
if version != modelVersion.GetVersion() {
return nil, fmt.Errorf(
"Model versiion mismatch for %s got %d but latest version is now %d",
"Model version mismatch for %s got %d but latest version is now %d",
modelKey, version, modelVersion.GetVersion(),
)
}
Expand Down Expand Up @@ -321,6 +321,7 @@ func (m *MemoryStore) updateLoadedModelsImpl(
modelKey, modelVersion.version, serverKey, replica.GetReplicaIdx(),
)
modelVersion.SetReplicaState(replica.GetReplicaIdx(), ReplicaStatus{State: LoadRequested})
m.updateReservedMemory(LoadRequested, serverKey, replica.GetReplicaIdx(), modelVersion.GetRequiredMemory())
updated = true
} else {
logger.Debugf(
Expand Down Expand Up @@ -443,7 +444,22 @@ func (m *MemoryStore) UpdateModelState(
}
return nil
}

func (m *MemoryStore) updateReservedMemory(
modelReplicaState ModelReplicaState, serverKey string, replicaIdx int, memBytes uint64) {
// update reserved memory that is being used for sorting replicas
// do we need to lock replica update?
server, ok := m.store.servers[serverKey]
if ok {
replica, okReplica := server.replicas[replicaIdx]
if okReplica {
if modelReplicaState == LoadRequested {
replica.UpdateReservedMemory(memBytes, true)
} else if modelReplicaState == LoadFailed || modelReplicaState == Loaded {
replica.UpdateReservedMemory(memBytes, false)
}
}
}
}
func (m *MemoryStore) updateModelStateImpl(
modelKey string,
version uint32,
Expand Down Expand Up @@ -473,6 +489,8 @@ func (m *MemoryStore) updateModelStateImpl(
)
}

m.updateReservedMemory(desiredState, serverKey, replicaIdx, modelVersion.GetRequiredMemory())

if existingState != desiredState {
latestModel := model.Latest()
isLatest := latestModel.GetVersion() == modelVersion.GetVersion()
Expand Down
Loading

0 comments on commit c0ca3fe

Please sign in to comment.