Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: some minor code changes #32

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {

// Check if there is capacity for the request
if !orch.CheckAICapacity(pipeline, modelID) {
respondWithError(w, fmt.Sprintf("insufficient capacity for pipeline=%v modelID=%v", pipeline, modelID), http.StatusServiceUnavailable)
respondWithError(w, fmt.Sprintf("Insufficient capacity for pipeline=%v modelID=%v", pipeline, modelID), http.StatusServiceUnavailable)
return
}

Expand Down Expand Up @@ -480,7 +480,7 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request

// Check if there is capacity for the request.
if !orch.CheckAICapacity(pipeline, modelID) {
respondWithError(w, fmt.Sprintf("insufficient capacity for pipeline=%v modelID=%v", pipeline, modelID), http.StatusServiceUnavailable)
respondWithError(w, fmt.Sprintf("Insufficient capacity for pipeline=%v modelID=%v", pipeline, modelID), http.StatusServiceUnavailable)
return
}

Expand Down
22 changes: 20 additions & 2 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,23 @@ func processImageToText(ctx context.Context, params aiRequestParams, req worker.
return txtResp, nil
}

// isRetryableError checks if the error is a transient error that can be retried.
func isRetryableError(err error) bool {
transientErrorMessages := []string{
"insufficient capacity", // Caused by limitation in our current implementation.
"invalid ticket sendernonce", // Caused by gateway nonce mismatch.
"ticketparams expired", // Caused by ticket expiration.
}

errMsg := strings.ToLower(err.Error())
for _, msg := range transientErrorMessages {
if strings.Contains(errMsg, msg) {
return true
}
}
return false
}

func processAIRequest(ctx context.Context, params aiRequestParams, req interface{}) (interface{}, error) {
var cap core.Capability
var modelID string
Expand Down Expand Up @@ -1503,12 +1520,13 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface
break
}

//errors that do not result in suspending the orchestrator
if strings.Contains(err.Error(), "insufficient capacity") || strings.Contains(err.Error(), "invalid ticket senderNonce") {
// Don't suspend the session if the error is a transient error.
if isRetryableError(err) {
params.sessManager.Complete(ctx, sess)
continue
}

// Suspend the session on other errors.
clog.Infof(ctx, "Error submitting request modelID=%v try=%v orch=%v err=%v", modelID, tries, sess.Transcoder(), err)
params.sessManager.Remove(ctx, sess) //TODO: Improve session selection logic for live-video-to-video

Expand Down
33 changes: 33 additions & 0 deletions server/ai_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"errors"
"reflect"
"testing"

Expand Down Expand Up @@ -88,6 +89,38 @@ func Test_submitAudioToText(t *testing.T) {
}
}

func Test_isRetryableError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "insufficient capacity error",
err: errors.New("Insufficient capacity"),
want: true,
},
{
name: "INSUFFICIENT capacity ERROR",
err: errors.New("Insufficient capacity"),
want: true,
},
{
name: "non-retryable error",
err: errors.New("some other error"),
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isRetryableError(tt.err); got != tt.want {
t.Errorf("isRetryableError() = %v, want %v", got, tt.want)
}
})
}
}

func TestEncodeReqMetadata(t *testing.T) {
tests := []struct {
name string
Expand Down
32 changes: 15 additions & 17 deletions server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,14 @@ func (pool *AISessionPool) Remove(sess *BroadcastSession) {
delete(pool.sessMap, sess.Transcoder())
pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)

penalty := 0
// If this method is called assume that the orch should be suspended
// as well. Since AISessionManager re-uses the pools the suspension
// penalty needs to consider the current suspender count to set the penalty
lastCount, ok := pool.suspender.list[sess.Transcoder()]
penalty := pool.suspender.count + pool.penalty
if ok {
penalty = pool.suspender.count - lastCount + pool.penalty
} else {
penalty = pool.suspender.count + pool.penalty
penalty -= lastCount
}

pool.suspender.suspend(sess.Transcoder(), penalty)
}

Expand Down Expand Up @@ -228,19 +225,24 @@ func newAICapabilities(cap core.Capability, modelID string, warm bool, minVersio
return caps
}

// selectorIsEmpty returns true if no orchestrators are in the warm or cold pools.
func (sel *AISessionSelector) SelectorIsEmpty() bool {
return sel.warmPool.Size() == 0 && sel.coldPool.Size() == 0
}

func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
shouldRefreshSelector := func() bool {

discoveryPoolSize := int(math.Min(float64(sel.node.OrchestratorPool.Size()), float64(sel.initialPoolSize)))

// If the selector is empty, release all orchestrators from suspension and
// try refresh.
if sel.SelectorIsEmpty() {
// release all orchestrators from suspension and try refresh
// if there are no orchestrators in the pools
clog.Infof(ctx, "refreshing sessions, no orchestrators in pools")
for i := 0; i < sel.penalty; i++ {
sel.suspender.signalRefresh()
}
}

// Refresh if the # of sessions across warm and cold pools falls below the smaller of the maxRefreshSessionsThreshold and
// 1/2 the total # of orchs that can be queried during discovery
if sel.warmPool.Size()+sel.coldPool.Size() < int(math.Min(maxRefreshSessionsThreshold, math.Ceil(float64(discoveryPoolSize)/2.0))) {
Expand Down Expand Up @@ -275,10 +277,6 @@ func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
return nil
}

func (sel *AISessionSelector) SelectorIsEmpty() bool {
return sel.warmPool.Size() == 0 && sel.coldPool.Size() == 0
}

func (sel *AISessionSelector) Complete(sess *AISession) {
if sess.Warm {
sel.warmPool.Complete(sess.BroadcastSession)
Expand Down Expand Up @@ -307,20 +305,20 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {

var warmSessions []*BroadcastSession
var coldSessions []*BroadcastSession

for _, sess := range sessions {
// If the constraints are missing for this capability skip this session
constraints, ok := sess.OrchestratorInfo.Capabilities.Constraints.PerCapability[uint32(sel.cap)]
if !ok {
continue
}

// this should not be needed, the GetOrchestrators checks for suspension but was seeing orchestrators get back into
// the pool that were suspended
// Should not be needed but suspended orchestrators seem to get back in the pool.
// TODO: Investigate why suspended orchestrators get back in the pool.
if sel.suspender.Suspended(sess.Transcoder()) > 0 {
clog.Infof(ctx, "skipping suspended orchestrator=%s", sess.Transcoder())
clog.V(common.DEBUG).Infof(ctx, "skipping suspended orchestrator=%s", sess.Transcoder())
continue
}

// If the constraint for the modelID are missing skip this session
modelConstraint, ok := constraints.Models[sel.modelID]
if !ok {
Expand Down Expand Up @@ -407,7 +405,7 @@ func (c *AISessionManager) Select(ctx context.Context, cap core.Capability, mode
return nil, err
}

clog.Infof(ctx, "selected orchestrator=%s", sess.Transcoder())
clog.V(common.DEBUG).Infof(ctx, "selected orchestrator=%s", sess.Transcoder())

return sess, nil
}
Expand Down
81 changes: 50 additions & 31 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,42 +296,55 @@ func getAvailableTranscodingOptionsHandler() http.Handler {
})
}

// poolOrchestrator contains information about an orchestrator in a pool.
type poolOrchestrator struct {
Url string `json:"url"`
LatencyScore float64 `json:"latency_score"`
InFlight int `json:"in_flight"`
}

// aiPoolInfo contains information about an AI pool.
type aiPoolInfo struct {
Size int `json:"size"`
InUse int `json:"in_use"`
Orchestrators []poolOrchestrator `json:"orchestrators"`
}

// suspendedInfo contains information about suspended orchestrators.
type suspendedInfo struct {
List map[string]int `json:"list"`
CurrentCount int `json:"current_count"`
}

// aiOrchestratorPools contains information about all AI pools.
type aiOrchestratorPools struct {
Cold aiPoolInfo `json:"cold"`
Warm aiPoolInfo `json:"warm"`
LastRefresh time.Time `json:"last_refresh"`
Suspended suspendedInfo `json:"suspended"`
}

// getAIOrchestratorPoolsInfoHandler returns information about AI orchestrator pools.
func (s *LivepeerServer) getAIPoolsInfoHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aiPoolInfoResp := make(map[string]interface{})
glog.V(common.DEBUG).Infof("getting AI pool info for %d selectors", len(s.AISessionManager.selectors))
aiPoolsInfoResp := make(map[string]aiOrchestratorPools)

s.AISessionManager.mu.Lock()
defer s.AISessionManager.mu.Unlock()

type poolOrchestrator struct {
Url string `json:"url"`
LatencyScore float64 `json:"latency_score"`
InFlight int `json:"in_flight"`
}
type aiPoolInfo struct {
Size int `json:"size"`
InUse int `json:"in_use"`
Orchestrators []poolOrchestrator `json:"orchestrators"`
}

type aiOrchestratorPool struct {
Cold aiPoolInfo `json:"cold"`
Warm aiPoolInfo `json:"warm"`
LastRefresh time.Time `json:"last_refresh"`
Suspended map[string]int `json:"suspended"`
CurrentCount int `json:"current_count"`
// Return if no selectors are present.
if len(s.AISessionManager.selectors) == 0 {
glog.Warning("Orchestrator pools are not yet initialized")
respondJson(w, aiPoolsInfoResp)
return
}

// Loop through selectors and get pools info.
for cap, pool := range s.AISessionManager.selectors {
warmPool := aiPoolInfo{
Size: pool.warmPool.Size(),
InUse: len(pool.warmPool.inUseSess),
}
coldPool := aiPoolInfo{
Size: pool.coldPool.Size(),
InUse: len(pool.coldPool.inUseSess),
}

for _, sess := range pool.warmPool.sessMap {
poolOrchestrator := poolOrchestrator{
Url: sess.Transcoder(),
Expand All @@ -341,21 +354,27 @@ func (s *LivepeerServer) getAIPoolsInfoHandler() http.Handler {
warmPool.Orchestrators = append(warmPool.Orchestrators, poolOrchestrator)
}

coldPool := aiPoolInfo{
Size: pool.coldPool.Size(),
InUse: len(pool.coldPool.inUseSess),
}
for _, sess := range pool.coldPool.sessMap {
poolOrchestrator := poolOrchestrator{
coldPool.Orchestrators = append(coldPool.Orchestrators, poolOrchestrator{
Url: sess.Transcoder(),
LatencyScore: sess.LatencyScore,
InFlight: len(sess.SegsInFlight),
}
coldPool.Orchestrators = append(coldPool.Orchestrators, poolOrchestrator)
})
}

selectorPools := aiOrchestratorPool{Cold: coldPool, Warm: warmPool, Suspended: pool.suspender.list, CurrentCount: pool.suspender.count, LastRefresh: pool.lastRefreshTime}
aiPoolInfoResp[cap] = selectorPools
aiPoolsInfoResp[cap] = aiOrchestratorPools{
Cold: coldPool,
Warm: warmPool,
LastRefresh: pool.lastRefreshTime,
Suspended: suspendedInfo{List: pool.suspender.list, CurrentCount: pool.suspender.count},
}
}

glog.V(common.DEBUG).Infof("sending AI pool info for %d selectors", len(s.AISessionManager.selectors))
respondJson(w, aiPoolInfoResp)
respondJson(w, aiPoolsInfoResp)
})
}

Expand Down