Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make filtering orchestrators that haven't been updated in last day optional #2073

Merged
merged 4 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### General

- \#2013 Add support for EIP-1559 transactions (@yondonfu)
- \#2073 Make filtering orchestrators in the DB that haven't been updated in last day optional (@yondonfu)

#### Broadcaster

Expand Down
26 changes: 19 additions & 7 deletions common/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ type DBUnbondingLock struct {

// DBOrchFilter is an object used to attach a filter to a selectOrch query
type DBOrchFilter struct {
MaxPrice *big.Rat
CurrentRound *big.Int
Addresses []ethcommon.Address
MaxPrice *big.Rat
CurrentRound *big.Int
Addresses []ethcommon.Address
UpdatedLastDay bool
}

var LivepeerDBVersion = 1
Expand Down Expand Up @@ -822,29 +823,40 @@ func buildOrchCountQuery(filter *DBOrchFilter) (string, error) {
}

func buildFilterOrchsQuery(filter *DBOrchFilter) (string, error) {
qry := "WHERE updatedAt >= datetime('now','-1 day')"
qry := ""
var filters []string
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to joining a slice of strings with the " AND " delimiter because it is possible that we end up with a query without a WHERE clause or any ANDs.


if filter != nil {
if filter.UpdatedLastDay {
filters = append(filters, "updatedAt >= datetime('now','-1 day')")
}

if filter.MaxPrice != nil {
fixedPrice, err := PriceToFixed(filter.MaxPrice)
if err != nil {
return "", err
}
qry += " AND pricePerPixel <= " + strconv.FormatInt(fixedPrice, 10)
filters = append(filters, "pricePerPixel <= "+strconv.FormatInt(fixedPrice, 10))
}

if filter.CurrentRound != nil {
currentRound := filter.CurrentRound.Int64()
qry += fmt.Sprintf(" AND activationRound <= %v AND %v < deactivationRound", currentRound, currentRound)
filters = append(filters, fmt.Sprintf("activationRound <= %v AND %v < deactivationRound", currentRound, currentRound))
}

if len(filter.Addresses) > 0 {
hexAddrs := make([]string, len(filter.Addresses))
for i, addr := range filter.Addresses {
hexAddrs[i] = fmt.Sprintf("'%v'", addr.Hex())
}
qry += fmt.Sprintf(" AND ethereumAddr IN (%v)", strings.Join(hexAddrs, ", "))
filters = append(filters, fmt.Sprintf("ethereumAddr IN (%v)", strings.Join(hexAddrs, ", ")))
}
}

if len(filters) > 0 {
qry = "WHERE " + strings.Join(filters, " AND ")
}

return qry, nil
}

Expand Down
26 changes: 26 additions & 0 deletions common/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,32 @@ func TestDBFilterOrchs(t *testing.T) {
orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{Addresses: filterAddrs})
assert.Nil(err)
assert.Len(orchsFiltered, 0)

// Select all orchs if filter.UpdatedLastDay = false
orchsFiltered, err = dbh.SelectOrchs(nil)
assert.Nil(err)
assert.Len(orchsFiltered, 10)

orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{})
assert.Nil(err)
assert.Len(orchsFiltered, 10)

orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{Addresses: []ethcommon.Address{ethcommon.HexToAddress(orchAddrList[0])}})
assert.Nil(err)
assert.Len(orchsFiltered, 1)

// Select orchs that have been updated in the last day if filter.UpdatedLastDay = true
stmt := fmt.Sprintf("UPDATE orchestrators SET updatedAt=datetime('now','-2 day') WHERE ethereumAddr='%v'", orchAddrList[0])
_, err = dbraw.Exec(stmt)
require.Nil(err)

orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{UpdatedLastDay: true})
assert.Nil(err)
assert.Len(orchsFiltered, 9)

orchsFiltered, err = dbh.SelectOrchs(&DBOrchFilter{UpdatedLastDay: true, Addresses: []ethcommon.Address{ethcommon.HexToAddress(orchAddrList[0])}})
assert.Nil(err)
assert.Len(orchsFiltered, 0)
}

func TestDBUnbondingLocks(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ func TestProcessPayment_ActiveOrchestrator(t *testing.T) {

// orchestrator inactive -> error
err := orch.ProcessPayment(defaultPayment(t), ManifestID("some manifest"))
expErr := fmt.Sprintf("orchestrator is inactive, cannot process payments")
expErr := fmt.Sprintf("orchestrator %v is inactive in round %v, cannot process payments", addr.Hex(), 10)
assert.EqualError(err, expErr)

// orchestrator is active -> no error
Expand Down
5 changes: 3 additions & 2 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ func (orch *orchestrator) ProcessPayment(payment net.Payment, manifestID Manifes

sender := ethcommon.BytesToAddress(payment.Sender)

ok, err := orch.isActive(ethcommon.BytesToAddress(payment.TicketParams.Recipient))
recipientAddr := ethcommon.BytesToAddress(payment.TicketParams.Recipient)
ok, err := orch.isActive(recipientAddr)
if err != nil {
return err
}

if !ok {
return fmt.Errorf("orchestrator is inactive, cannot process payments")
return fmt.Errorf("orchestrator %v is inactive in round %v, cannot process payments", recipientAddr.Hex(), orch.rm.LastInitializedRound())
}

priceInfo := payment.GetExpectedPrice()
Expand Down
10 changes: 6 additions & 4 deletions discovery/db_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm
func (dbo *DBOrchestratorPoolCache) getURLs() ([]*url.URL, error) {
orchs, err := dbo.store.SelectOrchs(
&common.DBOrchFilter{
MaxPrice: server.BroadcastCfg.MaxPrice(),
CurrentRound: dbo.rm.LastInitializedRound(),
MaxPrice: server.BroadcastCfg.MaxPrice(),
CurrentRound: dbo.rm.LastInitializedRound(),
UpdatedLastDay: true,
},
)
if err != nil || len(orchs) <= 0 {
Expand Down Expand Up @@ -144,8 +145,9 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(numOrchestrators int, suspe
func (dbo *DBOrchestratorPoolCache) Size() int {
count, _ := dbo.store.OrchCount(
&common.DBOrchFilter{
MaxPrice: server.BroadcastCfg.MaxPrice(),
CurrentRound: dbo.rm.LastInitializedRound(),
MaxPrice: server.BroadcastCfg.MaxPrice(),
CurrentRound: dbo.rm.LastInitializedRound(),
UpdatedLastDay: true,
},
)
return count
Expand Down