Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/livepeer: Use price feed watcher for dynamic pricePerPixel #2981

Merged
merged 34 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
87d9992
eth/watchers: Create PriceFeed watcher
victorges Mar 6, 2024
ae07a11
eth: Create separate pricefeed client unit
victorges Mar 14, 2024
84b8d35
eth: Add tests for pricefeed client
victorges Mar 20, 2024
4db8153
eth/watchers: Add tests to the truncated ticker
victorges Mar 20, 2024
59450bf
eth/watchers: Add tests for pricefeedwatcher
victorges Mar 21, 2024
8f02cde
eth: Add comments to the new components
victorges Mar 21, 2024
a50f4ea
go fmt
victorges Mar 21, 2024
80d9c0e
cmd: make pricePerUnit flags strings
victorges Mar 21, 2024
d59c49f
cmd: Allow price per unit to be speficied with a currency
victorges Mar 21, 2024
2b73576
cmd: Add logic to start price update loop
victorges Mar 21, 2024
339a9eb
cmd: Add flag for specifying price feed address
victorges Mar 21, 2024
d892d3d
cmd: Add a lil test to priceDataToWei
victorges Mar 21, 2024
edbe6d7
TODO: Reminder for something I noticed is missing
victorges Mar 22, 2024
af1239c
cmd/starter: Support currencies for custom broadcaster prices
victorges Mar 22, 2024
ef16826
eth: Address minor review comments
victorges Mar 22, 2024
ed30cee
eth,eth/watchers: Improve pricefeed watcher interface
victorges Mar 22, 2024
aab8e1a
Merge branch 'vg/feat/usd-price-per-unit' into vg/feat/use-usd-price-…
victorges Mar 22, 2024
8b7c4d1
eth/watchers: Fix pricefeed watcher after merge
victorges Mar 22, 2024
87907f2
cmd,core,server: Support dynamic updates to price in USD
victorges Mar 23, 2024
32087b4
eth/watchers: Remove truncated ticker tests
victorges Mar 25, 2024
622e5e8
Merge branch 'vg/feat/usd-price-per-unit' into vg/feat/use-usd-price-…
victorges Mar 25, 2024
5be44db
eth/watchers: Finalize pricefeedwatcher docs/tests
victorges Mar 25, 2024
0d43631
cmd: Address review comment
victorges Mar 25, 2024
cb39078
core: Create tests for autoconvertedprice
victorges Mar 25, 2024
469321e
cmd,core: Move wei default to AutoConvertedPrice
victorges Mar 25, 2024
dae7381
Address review comments
victorges Mar 26, 2024
13bc0a6
cmd: Fix the e2e flow for setting/updating configs
victorges Mar 26, 2024
1b95e65
CHANGELOG
victorges Mar 27, 2024
ed7dc9b
cmd: Make sure pricePerPixel can be specified with e notation
victorges Mar 27, 2024
d62d633
Fix tests
victorges Mar 27, 2024
7215877
Merge branch 'master' into vg/feat/use-usd-price-per-unit
victorges Mar 27, 2024
25dd102
go fmt
victorges Mar 27, 2024
5b7d707
core: Fix typo in comment
victorges Mar 27, 2024
32c6f14
cmd,server: Use 3 decimal points when logging PPP
victorges Mar 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#### General

- [#2981](https://github.com/livepeer/go-livepeer/pull/2981) Add support for prices in custom currencies like USD (@victorges)

#### Broadcaster

#### Orchestrator
Expand Down
11 changes: 6 additions & 5 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.SelectPriceExpFactor = flag.Float64("selectPriceExpFactor", *cfg.SelectPriceExpFactor, "Expresses how significant a small change of price is for the selection algorithm; default 100")
cfg.OrchPerfStatsURL = flag.String("orchPerfStatsUrl", *cfg.OrchPerfStatsURL, "URL of Orchestrator Performance Stream Tester")
cfg.Region = flag.String("region", *cfg.Region, "Region in which a broadcaster is deployed; used to select the region while using the orchestrator's performance stats")
cfg.MaxPricePerUnit = flag.Int("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price")
cfg.MaxPricePerUnit = flag.String("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price. Can be specified in wei or a custom currency in the format <price><currency> (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr")
victorges marked this conversation as resolved.
Show resolved Hide resolved
cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")

// Transcoding:
Expand Down Expand Up @@ -171,11 +171,12 @@ func parseLivepeerConfig() starter.LivepeerConfig {
// Broadcaster deposit multiplier to determine max acceptable ticket faceValue
cfg.DepositMultiplier = flag.Int("depositMultiplier", *cfg.DepositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets")
// Orchestrator base pricing info
cfg.PricePerUnit = flag.Int("pricePerUnit", 0, "The price per 'pixelsPerUnit' amount pixels")
// Unit of pixels for both O's basePriceInfo and B's MaxBroadcastPrice
cfg.PixelsPerUnit = flag.Int("pixelsPerUnit", *cfg.PixelsPerUnit, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel")
cfg.PricePerUnit = flag.String("pricePerUnit", "0", "The price per 'pixelsPerUnit' amount pixels. Can be specified in wei or a custom currency in the format <price><currency> (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr")
// Unit of pixels for both O's pricePerUnit and B's maxPricePerUnit
cfg.PixelsPerUnit = flag.String("pixelsPerUnit", *cfg.PixelsPerUnit, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel")
cfg.PriceFeedAddr = flag.String("priceFeedAddr", *cfg.PriceFeedAddr, "ETH address of the Chainlink price feed contract. Used for custom currencies conversion on -pricePerUnit or -maxPricePerUnit")
cfg.AutoAdjustPrice = flag.Bool("autoAdjustPrice", *cfg.AutoAdjustPrice, "Enable/disable automatic price adjustments based on the overhead for redeeming tickets")
cfg.PricePerBroadcaster = flag.String("pricePerBroadcaster", *cfg.PricePerBroadcaster, `json list of price per broadcaster or path to json config file. Example: {"broadcasters":[{"ethaddress":"address1","priceperunit":1000,"pixelsperunit":1},{"ethaddress":"address2","priceperunit":1200,"pixelsperunit":1}]}`)
cfg.PricePerBroadcaster = flag.String("pricePerBroadcaster", *cfg.PricePerBroadcaster, `json list of price per broadcaster or path to json config file. Example: {"broadcasters":[{"ethaddress":"address1","priceperunit":0.5,"currency":"USD","pixelsperunit":1000000000000},{"ethaddress":"address2","priceperunit":0.3,"currency":"USD","pixelsperunit":1000000000000}]}`)
// Interval to poll for blocks
cfg.BlockPollingInterval = flag.Int("blockPollingInterval", *cfg.BlockPollingInterval, "Interval in seconds at which different blockchain event services poll for blocks")
// Redemption service
Expand Down
163 changes: 128 additions & 35 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"os"
"os/user"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -32,6 +33,7 @@
"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/eth/blockwatch"
"github.com/livepeer/go-livepeer/eth/watchers"
"github.com/livepeer/go-livepeer/monitor"
lpmon "github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/server"
Expand Down Expand Up @@ -61,10 +63,10 @@
)

const (
BroadcasterRpcPort = "9935"

Check warning on line 66 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

const BroadcasterRpcPort should be BroadcasterRPCPort
BroadcasterCliPort = "5935"
BroadcasterRtmpPort = "1935"
OrchestratorRpcPort = "8935"

Check warning on line 69 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

const OrchestratorRpcPort should be OrchestratorRPCPort
OrchestratorCliPort = "7935"
TranscoderCliPort = "6935"

Expand All @@ -75,14 +77,14 @@
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string

Check warning on line 80 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

struct field HttpAddr should be HTTPAddr
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool

Check warning on line 87 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

struct field HttpIngest should be HTTPIngest
Orchestrator *bool
Transcoder *bool
Broadcaster *bool
Expand All @@ -95,7 +97,7 @@
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *int
MaxPricePerUnit *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Expand All @@ -106,7 +108,7 @@
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string

Check warning on line 111 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

struct field EthUrl should be EthURL
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
Expand All @@ -118,8 +120,9 @@
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *int
PixelsPerUnit *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerBroadcaster *string
BlockPollingInterval *int
Expand All @@ -129,7 +132,7 @@
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string

Check warning on line 135 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

struct field MetadataQueueUri should be MetadataQueueURI
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
Expand Down Expand Up @@ -192,8 +195,9 @@
defaultMaxTicketEV := "3000000000000"
defaultMaxTotalEV := "20000000000000"
defaultDepositMultiplier := 1
defaultMaxPricePerUnit := 0
defaultPixelsPerUnit := 1
defaultMaxPricePerUnit := "0"
defaultPixelsPerUnit := "1"
defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet

Check warning on line 200 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L198-L200

Added lines #L198 - L200 were not covered by tests
defaultAutoAdjustPrice := true
defaultPricePerBroadcaster := ""
defaultBlockPollingInterval := 5
Expand Down Expand Up @@ -278,6 +282,7 @@
DepositMultiplier: &defaultDepositMultiplier,
MaxPricePerUnit: &defaultMaxPricePerUnit,
PixelsPerUnit: &defaultPixelsPerUnit,
PriceFeedAddr: &defaultPriceFeedAddr,

Check warning on line 285 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L285

Added line #L285 was not covered by tests
AutoAdjustPrice: &defaultAutoAdjustPrice,
PricePerBroadcaster: &defaultPricePerBroadcaster,
BlockPollingInterval: &defaultBlockPollingInterval,
Expand Down Expand Up @@ -712,6 +717,13 @@
go serviceRegistryWatcher.Watch()
defer serviceRegistryWatcher.Stop()

core.PriceFeedWatcher, err = watchers.NewPriceFeedWatcher(backend, *cfg.PriceFeedAddr)

Check warning on line 720 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L720

Added line #L720 was not covered by tests
leszko marked this conversation as resolved.
Show resolved Hide resolved
// The price feed watch loop is started on demand on first subscribe.
if err != nil {
glog.Errorf("Failed to set up price feed watcher: %v", err)
return

Check warning on line 724 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L722-L724

Added lines #L722 - L724 were not covered by tests
}

n.Balances = core.NewAddressBalances(cleanupInterval)
defer n.Balances.StopCleanup()

Expand All @@ -733,27 +745,44 @@

if *cfg.Orchestrator {
// Set price per pixel base info
if *cfg.PixelsPerUnit <= 0 {
pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit)
if !ok || !pixelsPerUnit.IsInt() {
panic(fmt.Errorf("-pixelsPerUnit must be a valid integer, provided %v", *cfg.PixelsPerUnit))

Check warning on line 750 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L748-L750

Added lines #L748 - L750 were not covered by tests
}
if pixelsPerUnit.Sign() <= 0 {

Check warning on line 752 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L752

Added line #L752 was not covered by tests
// Can't divide by 0
panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %d", *cfg.PixelsPerUnit))
panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %v", *cfg.PixelsPerUnit))

Check warning on line 754 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L754

Added line #L754 was not covered by tests
}
if cfg.PricePerUnit == nil {
// Prevent orchestrators from unknowingly providing free transcoding
panic(fmt.Errorf("-pricePerUnit must be set"))
}
if *cfg.PricePerUnit < 0 {
panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %d", *cfg.PricePerUnit))
pricePerUnit, currency, err := parsePricePerUnit(*cfg.PricePerUnit)
leszko marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(fmt.Errorf("-pricePerUnit must be a valid integer with an optional currency, provided %v", *cfg.PricePerUnit))
} else if pricePerUnit.Sign() < 0 {
panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %s", pricePerUnit))

Check warning on line 764 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L760-L764

Added lines #L760 - L764 were not covered by tests
}
pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit)
victorges marked this conversation as resolved.
Show resolved Hide resolved
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
glog.Infof("Price: %v wei per pixel\n ", price.FloatString(3))
})
if err != nil {
panic(fmt.Errorf("Error converting price: %v", err))

Check warning on line 771 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L766-L771

Added lines #L766 - L771 were not covered by tests
}
n.SetBasePrice("default", big.NewRat(int64(*cfg.PricePerUnit), int64(*cfg.PixelsPerUnit)))
glog.Infof("Price: %d wei for %d pixels\n ", *cfg.PricePerUnit, *cfg.PixelsPerUnit)

if *cfg.PricePerBroadcaster != "" {
ppb := getBroadcasterPrices(*cfg.PricePerBroadcaster)
for _, p := range ppb {
price := big.NewRat(p.PricePerUnit, p.PixelsPerUnit)
n.SetBasePrice(p.EthAddress, price)
glog.Infof("Price: %v set for broadcaster %v", price.RatString(), p.EthAddress)
n.SetBasePrice("default", autoPrice)

Check warning on line 773 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L773

Added line #L773 was not covered by tests

broadcasterPrices := getBroadcasterPrices(*cfg.PricePerBroadcaster)
for _, p := range broadcasterPrices {
p := p
pricePerPixel := new(big.Rat).Quo(p.PricePerUnit, p.PixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(p.Currency, pricePerPixel, func(price *big.Rat) {
glog.Infof("Price: %v wei per pixel for broadcaster %v", price.FloatString(3), p.EthAddress)
})
if err != nil {
panic(fmt.Errorf("Error converting price for broadcaster %s: %v", p.EthAddress, err))

Check warning on line 783 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L775-L783

Added lines #L775 - L783 were not covered by tests
}
n.SetBasePrice(p.EthAddress, autoPrice)

Check warning on line 785 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L785

Added line #L785 was not covered by tests
}

n.AutoSessionLimit = *cfg.MaxSessions == "auto"
Expand Down Expand Up @@ -817,7 +846,7 @@
if mfv == nil {
panic(fmt.Errorf("-maxFaceValue must be a valid integer, but %v provided. Restart the node with a different valid value for -maxFaceValue", *cfg.MaxFaceValue))
return
} else {

Check warning on line 849 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

if block ends with a return statement, so drop this else and outdent its block
n.SetMaxFaceValue(mfv)
}

Expand Down Expand Up @@ -850,12 +879,30 @@

n.Sender = pm.NewSender(n.Eth, timeWatcher, senderWatcher, maxEV, maxTotalEV, *cfg.DepositMultiplier)

if *cfg.PixelsPerUnit <= 0 {
pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit)
if !ok || !pixelsPerUnit.IsInt() {
panic(fmt.Errorf("-pixelsPerUnit must be a valid integer, provided %v", *cfg.PixelsPerUnit))

Check warning on line 884 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L882-L884

Added lines #L882 - L884 were not covered by tests
}
if pixelsPerUnit.Sign() <= 0 {

Check warning on line 886 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L886

Added line #L886 was not covered by tests
// Can't divide by 0
panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *cfg.PixelsPerUnit))
panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %v", *cfg.PixelsPerUnit))

Check warning on line 888 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L888

Added line #L888 was not covered by tests
}
maxPricePerUnit, currency, err := parsePricePerUnit(*cfg.MaxPricePerUnit)
if err != nil {
panic(fmt.Errorf("The maximum price per unit must be a valid integer with an optional currency, provided %v instead\n", *cfg.MaxPricePerUnit))

Check warning on line 892 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L890-L892

Added lines #L890 - L892 were not covered by tests
}
if *cfg.MaxPricePerUnit > 0 {
server.BroadcastCfg.SetMaxPrice(big.NewRat(int64(*cfg.MaxPricePerUnit), int64(*cfg.PixelsPerUnit)))
if maxPricePerUnit.Sign() > 0 {
pricePerPixel := new(big.Rat).Quo(maxPricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxTranscodingPrice(price)

Check warning on line 898 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L894-L898

Added lines #L894 - L898 were not covered by tests
}
glog.Infof("Maximum transcoding price: %v wei per pixel\n ", price.FloatString(3))

Check warning on line 900 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L900

Added line #L900 was not covered by tests
})
if err != nil {
panic(fmt.Errorf("Error converting price: %v", err))

Check warning on line 903 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L902-L903

Added lines #L902 - L903 were not covered by tests
}
server.BroadcastCfg.SetMaxPrice(autoPrice)

Check warning on line 905 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L905

Added line #L905 was not covered by tests
} else {
glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *cfg.MaxPricePerUnit)
glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values")
Expand Down Expand Up @@ -1420,29 +1467,59 @@
return nil
}

// Format of broadcasterPrices json
// {"broadcasters":[{"ethaddress":"address1","priceperunit":1000,"pixelsperunit":1}, {"ethaddress":"address2","priceperunit":2000,"pixelsperunit":3}]}
type BroadcasterPrices struct {
Prices []BroadcasterPrice `json:"broadcasters"`
}

type BroadcasterPrice struct {
EthAddress string `json:"ethaddress"`
PricePerUnit int64 `json:"priceperunit"`
PixelsPerUnit int64 `json:"pixelsperunit"`
EthAddress string
PricePerUnit *big.Rat
Currency string
PixelsPerUnit *big.Rat
}

func getBroadcasterPrices(broadcasterPrices string) []BroadcasterPrice {
var pricesSet BroadcasterPrices
prices, _ := common.ReadFromFile(broadcasterPrices)
if broadcasterPrices == "" {
return nil

Check warning on line 1479 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1479

Added line #L1479 was not covered by tests
}

// Format of broadcasterPrices json
// {"broadcasters":[{"ethaddress":"address1","priceperunit":0.5,"currency":"USD","pixelsperunit":1}, {"ethaddress":"address2","priceperunit":0.3,"currency":"USD","pixelsperunit":3}]}
var pricesSet struct {
Broadcasters []struct {
EthAddress string `json:"ethaddress"`
// The fields below are specified as a number in the JSON, but we don't want to lose precision so we store the raw characters here and parse as a big.Rat.
// This also allows support for exponential notation for numbers, which is helpful for pricePerUnit which could be a value like 1e12.
PixelsPerUnit json.RawMessage `json:"pixelsperunit"`
PricePerUnit json.RawMessage `json:"priceperunit"`
Currency string `json:"currency"`
} `json:"broadcasters"`
victorges marked this conversation as resolved.
Show resolved Hide resolved
}
pricesFileContent, _ := common.ReadFromFile(broadcasterPrices)

err := json.Unmarshal([]byte(prices), &pricesSet)
err := json.Unmarshal([]byte(pricesFileContent), &pricesSet)
if err != nil {
glog.Errorf("broadcaster prices could not be parsed: %s", err)
return nil
}

return pricesSet.Prices
prices := make([]BroadcasterPrice, len(pricesSet.Broadcasters))
for i, p := range pricesSet.Broadcasters {
pixelsPerUnit, ok := new(big.Rat).SetString(string(p.PixelsPerUnit))
if !ok {
glog.Errorf("Pixels per unit could not be parsed for broadcaster %v. must be a valid number, provided %s", p.EthAddress, p.PixelsPerUnit)
continue

Check warning on line 1507 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1506-L1507

Added lines #L1506 - L1507 were not covered by tests
}
pricePerUnit, ok := new(big.Rat).SetString(string(p.PricePerUnit))
if !ok {
glog.Errorf("Price per unit could not be parsed for broadcaster %v. must be a valid number, provided %s", p.EthAddress, p.PricePerUnit)
continue

Check warning on line 1512 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1511-L1512

Added lines #L1511 - L1512 were not covered by tests
}
prices[i] = BroadcasterPrice{
EthAddress: p.EthAddress,
Currency: p.Currency,
PricePerUnit: pricePerUnit,
PixelsPerUnit: pixelsPerUnit,
}
}

return prices
}

func createSelectionAlgorithm(cfg LivepeerConfig) (common.SelectionAlgorithm, error) {
Expand Down Expand Up @@ -1494,6 +1571,22 @@
return keystore, nil
}

func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) {
pricePerUnitRex := regexp.MustCompile(`^(\d+(\.\d+)?)([A-z][A-z0-9]*)?$`)
match := pricePerUnitRex.FindStringSubmatch(pricePerUnitStr)
victorges marked this conversation as resolved.
Show resolved Hide resolved
if match == nil {
return nil, "", fmt.Errorf("price must be in the format of <price><currency>, provided %v", pricePerUnitStr)
}
price, currency := match[1], match[3]

pricePerUnit, ok := new(big.Rat).SetString(price)
if !ok {
return nil, "", fmt.Errorf("price must be a valid number, provided %v", match[1])

Check warning on line 1584 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1584

Added line #L1584 was not covered by tests
}

return pricePerUnit, currency, nil
}

func refreshOrchPerfScoreLoop(ctx context.Context, region string, orchPerfScoreURL string, score *common.PerfScore) {
for {
refreshOrchPerfScore(region, orchPerfScoreURL, score)
Expand Down
92 changes: 90 additions & 2 deletions cmd/livepeer/starter/starter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func TestParseGetBroadcasterPrices(t *testing.T) {
assert.NotNil(prices)
assert.Equal(2, len(prices))

price1 := big.NewRat(prices[0].PricePerUnit, prices[0].PixelsPerUnit)
price2 := big.NewRat(prices[1].PricePerUnit, prices[1].PixelsPerUnit)
price1 := new(big.Rat).Quo(prices[0].PricePerUnit, prices[0].PixelsPerUnit)
price2 := new(big.Rat).Quo(prices[1].PricePerUnit, prices[1].PixelsPerUnit)
assert.Equal(big.NewRat(1000, 1), price1)
assert.Equal(big.NewRat(2000, 3), price2)
}
Expand Down Expand Up @@ -295,3 +295,91 @@ func TestUpdatePerfScore(t *testing.T) {
}
require.Equal(t, expScores, scores.Scores)
}

func TestParsePricePerUnit(t *testing.T) {
tests := []struct {
name string
pricePerUnitStr string
expectedPrice *big.Rat
expectedCurrency string
expectError bool
}{
{
name: "Valid input with integer price",
pricePerUnitStr: "100USD",
expectedPrice: big.NewRat(100, 1),
expectedCurrency: "USD",
expectError: false,
},
{
name: "Valid input with fractional price",
pricePerUnitStr: "0.13USD",
expectedPrice: big.NewRat(13, 100),
expectedCurrency: "USD",
expectError: false,
},
{
name: "Valid input with decimal price",
pricePerUnitStr: "99.99EUR",
expectedPrice: big.NewRat(9999, 100),
expectedCurrency: "EUR",
expectError: false,
},
{
name: "Lower case currency",
pricePerUnitStr: "99.99eur",
expectedPrice: big.NewRat(9999, 100),
expectedCurrency: "eur",
expectError: false,
},
{
name: "Currency with numbers",
pricePerUnitStr: "420DOG3",
expectedPrice: big.NewRat(420, 1),
expectedCurrency: "DOG3",
expectError: false,
},
{
name: "No specified currency, empty currency",
pricePerUnitStr: "100",
expectedPrice: big.NewRat(100, 1),
expectedCurrency: "",
expectError: false,
},
{
name: "Explicit wei currency",
pricePerUnitStr: "100wei",
expectedPrice: big.NewRat(100, 1),
expectedCurrency: "wei",
expectError: false,
},
{
name: "Invalid number",
pricePerUnitStr: "abcUSD",
expectedPrice: nil,
expectedCurrency: "",
expectError: true,
},
{
name: "Negative price",
pricePerUnitStr: "-100USD",
expectedPrice: nil,
expectedCurrency: "",
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
price, currency, err := parsePricePerUnit(tt.pricePerUnitStr)

if tt.expectError {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.True(t, tt.expectedPrice.Cmp(price) == 0)
assert.Equal(t, tt.expectedCurrency, currency)
}
})
}
}
Loading
Loading