From 7b0397e8f932e0e2adb39905b0134d8470a768af Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Tue, 26 Oct 2021 12:44:05 -0400 Subject: [PATCH 1/4] common: Add UpdatedLastDay field to DBOrchFilter Also refactor buildFilterOrchsQuery --- common/db.go | 26 +++++++++++++++++++------- common/db_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/common/db.go b/common/db.go index d433f56e94..3cea8ed502 100644 --- a/common/db.go +++ b/common/db.go @@ -63,9 +63,10 @@ type DBUnbondingLock struct { // DBOrchFilter is an object used to attach a filter to a selectOrch query type DBOrchFilter struct { - MaxPrice *big.Rat - CurrentRound *big.Int - Addresses []ethcommon.Address + MaxPrice *big.Rat + CurrentRound *big.Int + Addresses []ethcommon.Address + UpdatedLastDay bool } var LivepeerDBVersion = 1 @@ -822,19 +823,25 @@ func buildOrchCountQuery(filter *DBOrchFilter) (string, error) { } func buildFilterOrchsQuery(filter *DBOrchFilter) (string, error) { - qry := "WHERE updatedAt >= datetime('now','-1 day')" + qry := "" + var filters []string + if filter != nil { + if filter.UpdatedLastDay { + filters = append(filters, "updatedAt >= datetime('now','-1 day')") + } + if filter.MaxPrice != nil { fixedPrice, err := PriceToFixed(filter.MaxPrice) if err != nil { return "", err } - qry += " AND pricePerPixel <= " + strconv.FormatInt(fixedPrice, 10) + filters = append(filters, "pricePerPixel <= "+strconv.FormatInt(fixedPrice, 10)) } if filter.CurrentRound != nil { currentRound := filter.CurrentRound.Int64() - qry += fmt.Sprintf(" AND activationRound <= %v AND %v < deactivationRound", currentRound, currentRound) + filters = append(filters, fmt.Sprintf("activationRound <= %v AND %v < deactivationRound", currentRound, currentRound)) } if len(filter.Addresses) > 0 { @@ -842,9 +849,14 @@ func buildFilterOrchsQuery(filter *DBOrchFilter) (string, error) { for i, addr := range filter.Addresses { hexAddrs[i] = fmt.Sprintf("'%v'", addr.Hex()) } - qry += fmt.Sprintf(" AND ethereumAddr IN (%v)", strings.Join(hexAddrs, ", ")) + filters = append(filters, fmt.Sprintf("ethereumAddr IN (%v)", strings.Join(hexAddrs, ", "))) } } + + if len(filters) > 0 { + qry = "WHERE " + strings.Join(filters, " AND ") + } + return qry, nil } diff --git a/common/db_test.go b/common/db_test.go index 945356ad8f..ab5f5b74c9 100644 --- a/common/db_test.go +++ b/common/db_test.go @@ -536,6 +536,32 @@ func TestDBFilterOrchs(t *testing.T) { orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{Addresses: filterAddrs}) assert.Nil(err) assert.Len(orchsFiltered, 0) + + // Select all orchs if filter.UpdatedLastDay = false + orchsFiltered, err = dbh.SelectOrchs(nil) + assert.Nil(err) + assert.Len(orchsFiltered, 10) + + orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{}) + assert.Nil(err) + assert.Len(orchsFiltered, 10) + + orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{Addresses: []ethcommon.Address{ethcommon.HexToAddress(orchAddrList[0])}}) + assert.Nil(err) + assert.Len(orchsFiltered, 1) + + // Select orchs that have been updated in the last day if filter.UpdatedLastDay = true + stmt := fmt.Sprintf("UPDATE orchestrators SET updatedAt=datetime('now','-2 day') WHERE ethereumAddr='%v'", orchAddrList[0]) + _, err = dbraw.Exec(stmt) + require.Nil(err) + + orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{UpdatedLastDay: true}) + assert.Nil(err) + assert.Len(orchsFiltered, 9) + + orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{UpdatedLastDay: true, Addresses: []ethcommon.Address{ethcommon.HexToAddress(orchAddrList[0])}}) + assert.Nil(err) + assert.Len(orchsFiltered, 0) } func TestDBUnbondingLocks(t *testing.T) { From 044b851c587761c837ab00783516431f71300eee Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Tue, 26 Oct 2021 12:45:10 -0400 Subject: [PATCH 2/4] discovery: Set LastUpdatedDay = true for DB orch pool --- discovery/db_discovery.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/discovery/db_discovery.go b/discovery/db_discovery.go index cabb240a1c..d34cc9729f 100644 --- a/discovery/db_discovery.go +++ b/discovery/db_discovery.go @@ -68,8 +68,9 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm func (dbo *DBOrchestratorPoolCache) getURLs() ([]*url.URL, error) { orchs, err := dbo.store.SelectOrchs( &common.DBOrchFilter{ - MaxPrice: server.BroadcastCfg.MaxPrice(), - CurrentRound: dbo.rm.LastInitializedRound(), + MaxPrice: server.BroadcastCfg.MaxPrice(), + CurrentRound: dbo.rm.LastInitializedRound(), + UpdatedLastDay: true, }, ) if err != nil || len(orchs) <= 0 { @@ -144,8 +145,9 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(numOrchestrators int, suspe func (dbo *DBOrchestratorPoolCache) Size() int { count, _ := dbo.store.OrchCount( &common.DBOrchFilter{ - MaxPrice: server.BroadcastCfg.MaxPrice(), - CurrentRound: dbo.rm.LastInitializedRound(), + MaxPrice: server.BroadcastCfg.MaxPrice(), + CurrentRound: dbo.rm.LastInitializedRound(), + UpdatedLastDay: true, }, ) return count From 7dbc5d3d794383bc0d19fcb6a4677e6d3e279c69 Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Tue, 26 Oct 2021 12:50:48 -0400 Subject: [PATCH 3/4] core: Add additional logging around isActive check --- core/orch_test.go | 2 +- core/orchestrator.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/orch_test.go b/core/orch_test.go index 7aefef4eca..bf8e23a0ab 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -798,7 +798,7 @@ func TestProcessPayment_ActiveOrchestrator(t *testing.T) { // orchestrator inactive -> error err := orch.ProcessPayment(defaultPayment(t), ManifestID("some manifest")) - expErr := fmt.Sprintf("orchestrator is inactive, cannot process payments") + expErr := fmt.Sprintf("orchestrator %v is inactive in round %v, cannot process payments", addr.Hex(), 10) assert.EqualError(err, expErr) // orchestrator is active -> no error diff --git a/core/orchestrator.go b/core/orchestrator.go index f7b34ee473..2c0c927c7c 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -122,13 +122,14 @@ func (orch *orchestrator) ProcessPayment(payment net.Payment, manifestID Manifes sender := ethcommon.BytesToAddress(payment.Sender) - ok, err := orch.isActive(ethcommon.BytesToAddress(payment.TicketParams.Recipient)) + recipientAddr := ethcommon.BytesToAddress(payment.TicketParams.Recipient) + ok, err := orch.isActive(recipientAddr) if err != nil { return err } if !ok { - return fmt.Errorf("orchestrator is inactive, cannot process payments") + return fmt.Errorf("orchestrator %v is inactive in round %v, cannot process payments", recipientAddr.Hex(), orch.rm.LastInitializedRound()) } priceInfo := payment.GetExpectedPrice() From 2ae79b6d1767c2a832af4c43cadc944fd14b043d Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Tue, 26 Oct 2021 12:59:15 -0400 Subject: [PATCH 4/4] CHANGELOG_PENDING: Optional orch filtering past day --- CHANGELOG_PENDING.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index fbabf7e629..e63a79162c 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -9,6 +9,7 @@ #### General - \#2013 Add support for EIP-1559 transactions (@yondonfu) +- \#2073 Make filtering orchestrators in the DB that haven't been updated in last day optional (@yondonfu) #### Broadcaster