Skip to content

Commit

Permalink
refactor: some minor code changes (#32)
Browse files Browse the repository at this point in the history
* refactor: some minor code changes
* refactor: fix transient error naming
* refactor: make isRetryableError case insensitive
  • Loading branch information
rickstaa authored Feb 13, 2025
1 parent 7636d8e commit 8091750
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 52 deletions.
4 changes: 2 additions & 2 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {

// Check if there is capacity for the request
if !orch.CheckAICapacity(pipeline, modelID) {
respondWithError(w, fmt.Sprintf("insufficient capacity for pipeline=%v modelID=%v", pipeline, modelID), http.StatusServiceUnavailable)
respondWithError(w, fmt.Sprintf("Insufficient capacity for pipeline=%v modelID=%v", pipeline, modelID), http.StatusServiceUnavailable)
return
}

Expand Down Expand Up @@ -480,7 +480,7 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request

// Check if there is capacity for the request.
if !orch.CheckAICapacity(pipeline, modelID) {
respondWithError(w, fmt.Sprintf("insufficient capacity for pipeline=%v modelID=%v", pipeline, modelID), http.StatusServiceUnavailable)
respondWithError(w, fmt.Sprintf("Insufficient capacity for pipeline=%v modelID=%v", pipeline, modelID), http.StatusServiceUnavailable)
return
}

Expand Down
22 changes: 20 additions & 2 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,23 @@ func processImageToText(ctx context.Context, params aiRequestParams, req worker.
return txtResp, nil
}

// isRetryableError checks if the error is a transient error that can be retried.
func isRetryableError(err error) bool {
transientErrorMessages := []string{
"insufficient capacity", // Caused by limitation in our current implementation.
"invalid ticket sendernonce", // Caused by gateway nonce mismatch.
"ticketparams expired", // Caused by ticket expiration.
}

errMsg := strings.ToLower(err.Error())
for _, msg := range transientErrorMessages {
if strings.Contains(errMsg, msg) {
return true
}
}
return false
}

func processAIRequest(ctx context.Context, params aiRequestParams, req interface{}) (interface{}, error) {
var cap core.Capability
var modelID string
Expand Down Expand Up @@ -1503,12 +1520,13 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface
break
}

//errors that do not result in suspending the orchestrator
if strings.Contains(err.Error(), "insufficient capacity") || strings.Contains(err.Error(), "invalid ticket senderNonce") {
// Don't suspend the session if the error is a transient error.
if isRetryableError(err) {
params.sessManager.Complete(ctx, sess)
continue
}

// Suspend the session on other errors.
clog.Infof(ctx, "Error submitting request modelID=%v try=%v orch=%v err=%v", modelID, tries, sess.Transcoder(), err)
params.sessManager.Remove(ctx, sess) //TODO: Improve session selection logic for live-video-to-video

Expand Down
33 changes: 33 additions & 0 deletions server/ai_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"errors"
"reflect"
"testing"

Expand Down Expand Up @@ -88,6 +89,38 @@ func Test_submitAudioToText(t *testing.T) {
}
}

func Test_isRetryableError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "insufficient capacity error",
err: errors.New("Insufficient capacity"),
want: true,
},
{
name: "INSUFFICIENT capacity ERROR",
err: errors.New("Insufficient capacity"),
want: true,
},
{
name: "non-retryable error",
err: errors.New("some other error"),
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isRetryableError(tt.err); got != tt.want {
t.Errorf("isRetryableError() = %v, want %v", got, tt.want)
}
})
}
}

func TestEncodeReqMetadata(t *testing.T) {
tests := []struct {
name string
Expand Down
32 changes: 15 additions & 17 deletions server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,14 @@ func (pool *AISessionPool) Remove(sess *BroadcastSession) {
delete(pool.sessMap, sess.Transcoder())
pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)

penalty := 0
// If this method is called assume that the orch should be suspended
// as well. Since AISessionManager re-uses the pools the suspension
// penalty needs to consider the current suspender count to set the penalty
lastCount, ok := pool.suspender.list[sess.Transcoder()]
penalty := pool.suspender.count + pool.penalty
if ok {
penalty = pool.suspender.count - lastCount + pool.penalty
} else {
penalty = pool.suspender.count + pool.penalty
penalty -= lastCount
}

pool.suspender.suspend(sess.Transcoder(), penalty)
}

Expand Down Expand Up @@ -228,19 +225,24 @@ func newAICapabilities(cap core.Capability, modelID string, warm bool, minVersio
return caps
}

// selectorIsEmpty returns true if no orchestrators are in the warm or cold pools.
func (sel *AISessionSelector) SelectorIsEmpty() bool {
return sel.warmPool.Size() == 0 && sel.coldPool.Size() == 0
}

func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
shouldRefreshSelector := func() bool {

discoveryPoolSize := int(math.Min(float64(sel.node.OrchestratorPool.Size()), float64(sel.initialPoolSize)))

// If the selector is empty, release all orchestrators from suspension and
// try refresh.
if sel.SelectorIsEmpty() {
// release all orchestrators from suspension and try refresh
// if there are no orchestrators in the pools
clog.Infof(ctx, "refreshing sessions, no orchestrators in pools")
for i := 0; i < sel.penalty; i++ {
sel.suspender.signalRefresh()
}
}

// Refresh if the # of sessions across warm and cold pools falls below the smaller of the maxRefreshSessionsThreshold and
// 1/2 the total # of orchs that can be queried during discovery
if sel.warmPool.Size()+sel.coldPool.Size() < int(math.Min(maxRefreshSessionsThreshold, math.Ceil(float64(discoveryPoolSize)/2.0))) {
Expand Down Expand Up @@ -275,10 +277,6 @@ func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
return nil
}

func (sel *AISessionSelector) SelectorIsEmpty() bool {
return sel.warmPool.Size() == 0 && sel.coldPool.Size() == 0
}

func (sel *AISessionSelector) Complete(sess *AISession) {
if sess.Warm {
sel.warmPool.Complete(sess.BroadcastSession)
Expand Down Expand Up @@ -307,20 +305,20 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {

var warmSessions []*BroadcastSession
var coldSessions []*BroadcastSession

for _, sess := range sessions {
// If the constraints are missing for this capability skip this session
constraints, ok := sess.OrchestratorInfo.Capabilities.Constraints.PerCapability[uint32(sel.cap)]
if !ok {
continue
}

// this should not be needed, the GetOrchestrators checks for suspension but was seeing orchestrators get back into
// the pool that were suspended
// Should not be needed but suspended orchestrators seem to get back in the pool.
// TODO: Investigate why suspended orchestrators get back in the pool.
if sel.suspender.Suspended(sess.Transcoder()) > 0 {
clog.Infof(ctx, "skipping suspended orchestrator=%s", sess.Transcoder())
clog.V(common.DEBUG).Infof(ctx, "skipping suspended orchestrator=%s", sess.Transcoder())
continue
}

// If the constraint for the modelID are missing skip this session
modelConstraint, ok := constraints.Models[sel.modelID]
if !ok {
Expand Down Expand Up @@ -407,7 +405,7 @@ func (c *AISessionManager) Select(ctx context.Context, cap core.Capability, mode
return nil, err
}

clog.Infof(ctx, "selected orchestrator=%s", sess.Transcoder())
clog.V(common.DEBUG).Infof(ctx, "selected orchestrator=%s", sess.Transcoder())

return sess, nil
}
Expand Down
81 changes: 50 additions & 31 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,42 +296,55 @@ func getAvailableTranscodingOptionsHandler() http.Handler {
})
}

// poolOrchestrator contains information about an orchestrator in a pool.
type poolOrchestrator struct {
Url string `json:"url"`
LatencyScore float64 `json:"latency_score"`
InFlight int `json:"in_flight"`
}

// aiPoolInfo contains information about an AI pool.
type aiPoolInfo struct {
Size int `json:"size"`
InUse int `json:"in_use"`
Orchestrators []poolOrchestrator `json:"orchestrators"`
}

// suspendedInfo contains information about suspended orchestrators.
type suspendedInfo struct {
List map[string]int `json:"list"`
CurrentCount int `json:"current_count"`
}

// aiOrchestratorPools contains information about all AI pools.
type aiOrchestratorPools struct {
Cold aiPoolInfo `json:"cold"`
Warm aiPoolInfo `json:"warm"`
LastRefresh time.Time `json:"last_refresh"`
Suspended suspendedInfo `json:"suspended"`
}

// getAIOrchestratorPoolsInfoHandler returns information about AI orchestrator pools.
func (s *LivepeerServer) getAIPoolsInfoHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aiPoolInfoResp := make(map[string]interface{})
glog.V(common.DEBUG).Infof("getting AI pool info for %d selectors", len(s.AISessionManager.selectors))
aiPoolsInfoResp := make(map[string]aiOrchestratorPools)

s.AISessionManager.mu.Lock()
defer s.AISessionManager.mu.Unlock()

type poolOrchestrator struct {
Url string `json:"url"`
LatencyScore float64 `json:"latency_score"`
InFlight int `json:"in_flight"`
}
type aiPoolInfo struct {
Size int `json:"size"`
InUse int `json:"in_use"`
Orchestrators []poolOrchestrator `json:"orchestrators"`
}

type aiOrchestratorPool struct {
Cold aiPoolInfo `json:"cold"`
Warm aiPoolInfo `json:"warm"`
LastRefresh time.Time `json:"last_refresh"`
Suspended map[string]int `json:"suspended"`
CurrentCount int `json:"current_count"`
// Return if no selectors are present.
if len(s.AISessionManager.selectors) == 0 {
glog.Warning("Orchestrator pools are not yet initialized")
respondJson(w, aiPoolsInfoResp)
return
}

// Loop through selectors and get pools info.
for cap, pool := range s.AISessionManager.selectors {
warmPool := aiPoolInfo{
Size: pool.warmPool.Size(),
InUse: len(pool.warmPool.inUseSess),
}
coldPool := aiPoolInfo{
Size: pool.coldPool.Size(),
InUse: len(pool.coldPool.inUseSess),
}

for _, sess := range pool.warmPool.sessMap {
poolOrchestrator := poolOrchestrator{
Url: sess.Transcoder(),
Expand All @@ -341,21 +354,27 @@ func (s *LivepeerServer) getAIPoolsInfoHandler() http.Handler {
warmPool.Orchestrators = append(warmPool.Orchestrators, poolOrchestrator)
}

coldPool := aiPoolInfo{
Size: pool.coldPool.Size(),
InUse: len(pool.coldPool.inUseSess),
}
for _, sess := range pool.coldPool.sessMap {
poolOrchestrator := poolOrchestrator{
coldPool.Orchestrators = append(coldPool.Orchestrators, poolOrchestrator{
Url: sess.Transcoder(),
LatencyScore: sess.LatencyScore,
InFlight: len(sess.SegsInFlight),
}
coldPool.Orchestrators = append(coldPool.Orchestrators, poolOrchestrator)
})
}

selectorPools := aiOrchestratorPool{Cold: coldPool, Warm: warmPool, Suspended: pool.suspender.list, CurrentCount: pool.suspender.count, LastRefresh: pool.lastRefreshTime}
aiPoolInfoResp[cap] = selectorPools
aiPoolsInfoResp[cap] = aiOrchestratorPools{
Cold: coldPool,
Warm: warmPool,
LastRefresh: pool.lastRefreshTime,
Suspended: suspendedInfo{List: pool.suspender.list, CurrentCount: pool.suspender.count},
}
}

glog.V(common.DEBUG).Infof("sending AI pool info for %d selectors", len(s.AISessionManager.selectors))
respondJson(w, aiPoolInfoResp)
respondJson(w, aiPoolsInfoResp)
})
}

Expand Down

0 comments on commit 8091750

Please sign in to comment.