diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3ca5044811..df9d710ef8 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -8,6 +8,8 @@ #### General +- [#2981](https://github.com/livepeer/go-livepeer/pull/2981) Add support for prices in custom currencies like USD (@victorges) + #### Broadcaster #### Orchestrator diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index b9ed7a907d..d875a079f8 100755 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -133,7 +133,7 @@ func parseLivepeerConfig() starter.LivepeerConfig { cfg.SelectPriceExpFactor = flag.Float64("selectPriceExpFactor", *cfg.SelectPriceExpFactor, "Expresses how significant a small change of price is for the selection algorithm; default 100") cfg.OrchPerfStatsURL = flag.String("orchPerfStatsUrl", *cfg.OrchPerfStatsURL, "URL of Orchestrator Performance Stream Tester") cfg.Region = flag.String("region", *cfg.Region, "Region in which a broadcaster is deployed; used to select the region while using the orchestrator's performance stats") - cfg.MaxPricePerUnit = flag.Int("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price") + cfg.MaxPricePerUnit = flag.String("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price. Can be specified in wei or a custom currency in the format (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr") cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept") // Transcoding: @@ -171,11 +171,12 @@ func parseLivepeerConfig() starter.LivepeerConfig { // Broadcaster deposit multiplier to determine max acceptable ticket faceValue cfg.DepositMultiplier = flag.Int("depositMultiplier", *cfg.DepositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets") // Orchestrator base pricing info - cfg.PricePerUnit = flag.Int("pricePerUnit", 0, "The price per 'pixelsPerUnit' amount pixels") - // Unit of pixels for both O's basePriceInfo and B's MaxBroadcastPrice - cfg.PixelsPerUnit = flag.Int("pixelsPerUnit", *cfg.PixelsPerUnit, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel") + cfg.PricePerUnit = flag.String("pricePerUnit", "0", "The price per 'pixelsPerUnit' amount pixels. Can be specified in wei or a custom currency in the format (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr") + // Unit of pixels for both O's pricePerUnit and B's maxPricePerUnit + cfg.PixelsPerUnit = flag.String("pixelsPerUnit", *cfg.PixelsPerUnit, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel") + cfg.PriceFeedAddr = flag.String("priceFeedAddr", *cfg.PriceFeedAddr, "ETH address of the Chainlink price feed contract. Used for custom currencies conversion on -pricePerUnit or -maxPricePerUnit") cfg.AutoAdjustPrice = flag.Bool("autoAdjustPrice", *cfg.AutoAdjustPrice, "Enable/disable automatic price adjustments based on the overhead for redeeming tickets") - cfg.PricePerBroadcaster = flag.String("pricePerBroadcaster", *cfg.PricePerBroadcaster, `json list of price per broadcaster or path to json config file. Example: {"broadcasters":[{"ethaddress":"address1","priceperunit":1000,"pixelsperunit":1},{"ethaddress":"address2","priceperunit":1200,"pixelsperunit":1}]}`) + cfg.PricePerBroadcaster = flag.String("pricePerBroadcaster", *cfg.PricePerBroadcaster, `json list of price per broadcaster or path to json config file. Example: {"broadcasters":[{"ethaddress":"address1","priceperunit":0.5,"currency":"USD","pixelsperunit":1000000000000},{"ethaddress":"address2","priceperunit":0.3,"currency":"USD","pixelsperunit":1000000000000}]}`) // Interval to poll for blocks cfg.BlockPollingInterval = flag.Int("blockPollingInterval", *cfg.BlockPollingInterval, "Interval in seconds at which different blockchain event services poll for blocks") // Redemption service diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 8c5da0bf2a..2896d95431 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -15,6 +15,7 @@ import ( "os" "os/user" "path/filepath" + "regexp" "strconv" "strings" "time" @@ -32,6 +33,7 @@ import ( "github.com/livepeer/go-livepeer/eth" "github.com/livepeer/go-livepeer/eth/blockwatch" "github.com/livepeer/go-livepeer/eth/watchers" + "github.com/livepeer/go-livepeer/monitor" lpmon "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-livepeer/pm" "github.com/livepeer/go-livepeer/server" @@ -95,7 +97,7 @@ type LivepeerConfig struct { SelectPriceExpFactor *float64 OrchPerfStatsURL *string Region *string - MaxPricePerUnit *int + MaxPricePerUnit *string MinPerfScore *float64 MaxSessions *string CurrentManifest *bool @@ -118,8 +120,9 @@ type LivepeerConfig struct { MaxTicketEV *string MaxTotalEV *string DepositMultiplier *int - PricePerUnit *int - PixelsPerUnit *int + PricePerUnit *string + PixelsPerUnit *string + PriceFeedAddr *string AutoAdjustPrice *bool PricePerBroadcaster *string BlockPollingInterval *int @@ -192,8 +195,9 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultMaxTicketEV := "3000000000000" defaultMaxTotalEV := "20000000000000" defaultDepositMultiplier := 1 - defaultMaxPricePerUnit := 0 - defaultPixelsPerUnit := 1 + defaultMaxPricePerUnit := "0" + defaultPixelsPerUnit := "1" + defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet defaultAutoAdjustPrice := true defaultPricePerBroadcaster := "" defaultBlockPollingInterval := 5 @@ -278,6 +282,7 @@ func DefaultLivepeerConfig() LivepeerConfig { DepositMultiplier: &defaultDepositMultiplier, MaxPricePerUnit: &defaultMaxPricePerUnit, PixelsPerUnit: &defaultPixelsPerUnit, + PriceFeedAddr: &defaultPriceFeedAddr, AutoAdjustPrice: &defaultAutoAdjustPrice, PricePerBroadcaster: &defaultPricePerBroadcaster, BlockPollingInterval: &defaultBlockPollingInterval, @@ -712,6 +717,13 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { go serviceRegistryWatcher.Watch() defer serviceRegistryWatcher.Stop() + core.PriceFeedWatcher, err = watchers.NewPriceFeedWatcher(backend, *cfg.PriceFeedAddr) + // The price feed watch loop is started on demand on first subscribe. + if err != nil { + glog.Errorf("Failed to set up price feed watcher: %v", err) + return + } + n.Balances = core.NewAddressBalances(cleanupInterval) defer n.Balances.StopCleanup() @@ -733,27 +745,44 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { if *cfg.Orchestrator { // Set price per pixel base info - if *cfg.PixelsPerUnit <= 0 { + pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit) + if !ok || !pixelsPerUnit.IsInt() { + panic(fmt.Errorf("-pixelsPerUnit must be a valid integer, provided %v", *cfg.PixelsPerUnit)) + } + if pixelsPerUnit.Sign() <= 0 { // Can't divide by 0 - panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %d", *cfg.PixelsPerUnit)) + panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %v", *cfg.PixelsPerUnit)) } if cfg.PricePerUnit == nil { // Prevent orchestrators from unknowingly providing free transcoding panic(fmt.Errorf("-pricePerUnit must be set")) } - if *cfg.PricePerUnit < 0 { - panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %d", *cfg.PricePerUnit)) + pricePerUnit, currency, err := parsePricePerUnit(*cfg.PricePerUnit) + if err != nil { + panic(fmt.Errorf("-pricePerUnit must be a valid integer with an optional currency, provided %v", *cfg.PricePerUnit)) + } else if pricePerUnit.Sign() < 0 { + panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %s", pricePerUnit)) + } + pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit) + autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + glog.Infof("Price: %v wei per pixel\n ", price.FloatString(3)) + }) + if err != nil { + panic(fmt.Errorf("Error converting price: %v", err)) } - n.SetBasePrice("default", big.NewRat(int64(*cfg.PricePerUnit), int64(*cfg.PixelsPerUnit))) - glog.Infof("Price: %d wei for %d pixels\n ", *cfg.PricePerUnit, *cfg.PixelsPerUnit) - - if *cfg.PricePerBroadcaster != "" { - ppb := getBroadcasterPrices(*cfg.PricePerBroadcaster) - for _, p := range ppb { - price := big.NewRat(p.PricePerUnit, p.PixelsPerUnit) - n.SetBasePrice(p.EthAddress, price) - glog.Infof("Price: %v set for broadcaster %v", price.RatString(), p.EthAddress) + n.SetBasePrice("default", autoPrice) + + broadcasterPrices := getBroadcasterPrices(*cfg.PricePerBroadcaster) + for _, p := range broadcasterPrices { + p := p + pricePerPixel := new(big.Rat).Quo(p.PricePerUnit, p.PixelsPerUnit) + autoPrice, err := core.NewAutoConvertedPrice(p.Currency, pricePerPixel, func(price *big.Rat) { + glog.Infof("Price: %v wei per pixel for broadcaster %v", price.FloatString(3), p.EthAddress) + }) + if err != nil { + panic(fmt.Errorf("Error converting price for broadcaster %s: %v", p.EthAddress, err)) } + n.SetBasePrice(p.EthAddress, autoPrice) } n.AutoSessionLimit = *cfg.MaxSessions == "auto" @@ -850,12 +879,30 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { n.Sender = pm.NewSender(n.Eth, timeWatcher, senderWatcher, maxEV, maxTotalEV, *cfg.DepositMultiplier) - if *cfg.PixelsPerUnit <= 0 { + pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit) + if !ok || !pixelsPerUnit.IsInt() { + panic(fmt.Errorf("-pixelsPerUnit must be a valid integer, provided %v", *cfg.PixelsPerUnit)) + } + if pixelsPerUnit.Sign() <= 0 { // Can't divide by 0 - panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *cfg.PixelsPerUnit)) + panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %v", *cfg.PixelsPerUnit)) + } + maxPricePerUnit, currency, err := parsePricePerUnit(*cfg.MaxPricePerUnit) + if err != nil { + panic(fmt.Errorf("The maximum price per unit must be a valid integer with an optional currency, provided %v instead\n", *cfg.MaxPricePerUnit)) } - if *cfg.MaxPricePerUnit > 0 { - server.BroadcastCfg.SetMaxPrice(big.NewRat(int64(*cfg.MaxPricePerUnit), int64(*cfg.PixelsPerUnit))) + if maxPricePerUnit.Sign() > 0 { + pricePerPixel := new(big.Rat).Quo(maxPricePerUnit, pixelsPerUnit) + autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + if monitor.Enabled { + monitor.MaxTranscodingPrice(price) + } + glog.Infof("Maximum transcoding price: %v wei per pixel\n ", price.FloatString(3)) + }) + if err != nil { + panic(fmt.Errorf("Error converting price: %v", err)) + } + server.BroadcastCfg.SetMaxPrice(autoPrice) } else { glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *cfg.MaxPricePerUnit) glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values") @@ -1420,29 +1467,59 @@ func checkOrStoreChainID(dbh *common.DB, chainID *big.Int) error { return nil } -// Format of broadcasterPrices json -// {"broadcasters":[{"ethaddress":"address1","priceperunit":1000,"pixelsperunit":1}, {"ethaddress":"address2","priceperunit":2000,"pixelsperunit":3}]} -type BroadcasterPrices struct { - Prices []BroadcasterPrice `json:"broadcasters"` -} - type BroadcasterPrice struct { - EthAddress string `json:"ethaddress"` - PricePerUnit int64 `json:"priceperunit"` - PixelsPerUnit int64 `json:"pixelsperunit"` + EthAddress string + PricePerUnit *big.Rat + Currency string + PixelsPerUnit *big.Rat } func getBroadcasterPrices(broadcasterPrices string) []BroadcasterPrice { - var pricesSet BroadcasterPrices - prices, _ := common.ReadFromFile(broadcasterPrices) + if broadcasterPrices == "" { + return nil + } + + // Format of broadcasterPrices json + // {"broadcasters":[{"ethaddress":"address1","priceperunit":0.5,"currency":"USD","pixelsperunit":1}, {"ethaddress":"address2","priceperunit":0.3,"currency":"USD","pixelsperunit":3}]} + var pricesSet struct { + Broadcasters []struct { + EthAddress string `json:"ethaddress"` + // The fields below are specified as a number in the JSON, but we don't want to lose precision so we store the raw characters here and parse as a big.Rat. + // This also allows support for exponential notation for numbers, which is helpful for pricePerUnit which could be a value like 1e12. + PixelsPerUnit json.RawMessage `json:"pixelsperunit"` + PricePerUnit json.RawMessage `json:"priceperunit"` + Currency string `json:"currency"` + } `json:"broadcasters"` + } + pricesFileContent, _ := common.ReadFromFile(broadcasterPrices) - err := json.Unmarshal([]byte(prices), &pricesSet) + err := json.Unmarshal([]byte(pricesFileContent), &pricesSet) if err != nil { glog.Errorf("broadcaster prices could not be parsed: %s", err) return nil } - return pricesSet.Prices + prices := make([]BroadcasterPrice, len(pricesSet.Broadcasters)) + for i, p := range pricesSet.Broadcasters { + pixelsPerUnit, ok := new(big.Rat).SetString(string(p.PixelsPerUnit)) + if !ok { + glog.Errorf("Pixels per unit could not be parsed for broadcaster %v. must be a valid number, provided %s", p.EthAddress, p.PixelsPerUnit) + continue + } + pricePerUnit, ok := new(big.Rat).SetString(string(p.PricePerUnit)) + if !ok { + glog.Errorf("Price per unit could not be parsed for broadcaster %v. must be a valid number, provided %s", p.EthAddress, p.PricePerUnit) + continue + } + prices[i] = BroadcasterPrice{ + EthAddress: p.EthAddress, + Currency: p.Currency, + PricePerUnit: pricePerUnit, + PixelsPerUnit: pixelsPerUnit, + } + } + + return prices } func createSelectionAlgorithm(cfg LivepeerConfig) (common.SelectionAlgorithm, error) { @@ -1494,6 +1571,22 @@ func parseEthKeystorePath(ethKeystorePath string) (keystorePath, error) { return keystore, nil } +func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) { + pricePerUnitRex := regexp.MustCompile(`^(\d+(\.\d+)?)([A-z][A-z0-9]*)?$`) + match := pricePerUnitRex.FindStringSubmatch(pricePerUnitStr) + if match == nil { + return nil, "", fmt.Errorf("price must be in the format of , provided %v", pricePerUnitStr) + } + price, currency := match[1], match[3] + + pricePerUnit, ok := new(big.Rat).SetString(price) + if !ok { + return nil, "", fmt.Errorf("price must be a valid number, provided %v", match[1]) + } + + return pricePerUnit, currency, nil +} + func refreshOrchPerfScoreLoop(ctx context.Context, region string, orchPerfScoreURL string, score *common.PerfScore) { for { refreshOrchPerfScore(region, orchPerfScoreURL, score) diff --git a/cmd/livepeer/starter/starter_test.go b/cmd/livepeer/starter/starter_test.go index 18a08633a2..45d3620b8e 100644 --- a/cmd/livepeer/starter/starter_test.go +++ b/cmd/livepeer/starter/starter_test.go @@ -96,8 +96,8 @@ func TestParseGetBroadcasterPrices(t *testing.T) { assert.NotNil(prices) assert.Equal(2, len(prices)) - price1 := big.NewRat(prices[0].PricePerUnit, prices[0].PixelsPerUnit) - price2 := big.NewRat(prices[1].PricePerUnit, prices[1].PixelsPerUnit) + price1 := new(big.Rat).Quo(prices[0].PricePerUnit, prices[0].PixelsPerUnit) + price2 := new(big.Rat).Quo(prices[1].PricePerUnit, prices[1].PixelsPerUnit) assert.Equal(big.NewRat(1000, 1), price1) assert.Equal(big.NewRat(2000, 3), price2) } @@ -295,3 +295,91 @@ func TestUpdatePerfScore(t *testing.T) { } require.Equal(t, expScores, scores.Scores) } + +func TestParsePricePerUnit(t *testing.T) { + tests := []struct { + name string + pricePerUnitStr string + expectedPrice *big.Rat + expectedCurrency string + expectError bool + }{ + { + name: "Valid input with integer price", + pricePerUnitStr: "100USD", + expectedPrice: big.NewRat(100, 1), + expectedCurrency: "USD", + expectError: false, + }, + { + name: "Valid input with fractional price", + pricePerUnitStr: "0.13USD", + expectedPrice: big.NewRat(13, 100), + expectedCurrency: "USD", + expectError: false, + }, + { + name: "Valid input with decimal price", + pricePerUnitStr: "99.99EUR", + expectedPrice: big.NewRat(9999, 100), + expectedCurrency: "EUR", + expectError: false, + }, + { + name: "Lower case currency", + pricePerUnitStr: "99.99eur", + expectedPrice: big.NewRat(9999, 100), + expectedCurrency: "eur", + expectError: false, + }, + { + name: "Currency with numbers", + pricePerUnitStr: "420DOG3", + expectedPrice: big.NewRat(420, 1), + expectedCurrency: "DOG3", + expectError: false, + }, + { + name: "No specified currency, empty currency", + pricePerUnitStr: "100", + expectedPrice: big.NewRat(100, 1), + expectedCurrency: "", + expectError: false, + }, + { + name: "Explicit wei currency", + pricePerUnitStr: "100wei", + expectedPrice: big.NewRat(100, 1), + expectedCurrency: "wei", + expectError: false, + }, + { + name: "Invalid number", + pricePerUnitStr: "abcUSD", + expectedPrice: nil, + expectedCurrency: "", + expectError: true, + }, + { + name: "Negative price", + pricePerUnitStr: "-100USD", + expectedPrice: nil, + expectedCurrency: "", + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + price, currency, err := parsePricePerUnit(tt.pricePerUnitStr) + + if tt.expectError { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.True(t, tt.expectedPrice.Cmp(price) == 0) + assert.Equal(t, tt.expectedCurrency, currency) + } + }) + } +} diff --git a/cmd/livepeer_cli/wizard_broadcast.go b/cmd/livepeer_cli/wizard_broadcast.go index b7967b0dd6..8f2c59e917 100644 --- a/cmd/livepeer_cli/wizard_broadcast.go +++ b/cmd/livepeer_cli/wizard_broadcast.go @@ -57,10 +57,14 @@ func (w *wizard) setBroadcastConfig() { fmt.Printf("eg. 1 wei / 10 pixels = 0,1 wei per pixel \n") fmt.Printf("\n") fmt.Printf("Enter amount of pixels that make up a single unit (default: 1 pixel) - ") - pixelsPerUnit := w.readDefaultInt(1) + // Read numbers as strings not to lose precision and support big numbers + pixelsPerUnit := w.readDefaultString("1") fmt.Printf("\n") - fmt.Printf("Enter the maximum price to pay for %d pixels in Wei (required) - ", pixelsPerUnit) - maxPricePerUnit := w.readDefaultInt(0) + fmt.Printf("Enter the currency for the price per unit (default: Wei) - ") + currency := w.readDefaultString("Wei") + fmt.Printf("\n") + fmt.Printf("Enter the maximum price to pay for %s pixels in %s (default: 0) - ", pixelsPerUnit, currency) + maxPricePerUnit := w.readDefaultString("0") opts := w.allTranscodingOptions() if opts == nil { @@ -77,12 +81,18 @@ func (w *wizard) setBroadcastConfig() { } val := url.Values{ - "pixelsPerUnit": {fmt.Sprintf("%v", strconv.Itoa(pixelsPerUnit))}, - "maxPricePerUnit": {fmt.Sprintf("%v", strconv.Itoa(maxPricePerUnit))}, + "pixelsPerUnit": {fmt.Sprintf("%v", pixelsPerUnit)}, + "currency": {fmt.Sprintf("%v", currency)}, + "maxPricePerUnit": {fmt.Sprintf("%v", maxPricePerUnit)}, "transcodingOptions": {fmt.Sprintf("%v", transOpts)}, } - httpPostWithParams(fmt.Sprintf("http://%v:%v/setBroadcastConfig", w.host, w.httpPort), val) + result, ok := httpPostWithParams(fmt.Sprintf("http://%v:%v/setBroadcastConfig", w.host, w.httpPort), val) + if !ok { + fmt.Printf("Error applying configuration: %s\n", result) + } else { + fmt.Printf("Configuration applied successfully\n") + } } func (w *wizard) idListToVideoProfileList(idList string, opts map[int]string) (string, error) { diff --git a/cmd/livepeer_cli/wizard_stats.go b/cmd/livepeer_cli/wizard_stats.go index 8a251fc9a4..bea3b07908 100644 --- a/cmd/livepeer_cli/wizard_stats.go +++ b/cmd/livepeer_cli/wizard_stats.go @@ -171,14 +171,10 @@ func (w *wizard) broadcastStats() { } price, transcodingOptions := w.getBroadcastConfig() - priceString := "n/a" - if price != nil { - priceString = fmt.Sprintf("%v wei / %v pixels", price.Num().Int64(), price.Denom().Int64()) - } table := tablewriter.NewWriter(os.Stdout) data := [][]string{ - {"Max Price Per Pixel", priceString}, + {"Max Price Per Pixel", formatPricePerPixel(price)}, {"Broadcast Transcoding Options", transcodingOptions}, {"Deposit", eth.FormatUnits(sender.Deposit, "ETH")}, {"Reserve", eth.FormatUnits(sender.Reserve.FundsRemaining, "ETH")}, @@ -227,7 +223,7 @@ func (w *wizard) orchestratorStats() { {"Reward Cut (%)", eth.FormatPerc(t.RewardCut)}, {"Fee Cut (%)", eth.FormatPerc(flipPerc(t.FeeShare))}, {"Last Reward Round", t.LastRewardRound.String()}, - {"Base price per pixel", fmt.Sprintf("%v wei / %v pixels", priceInfo.Num(), priceInfo.Denom())}, + {"Base price per pixel", formatPricePerPixel(priceInfo)}, {"Base price for broadcasters", b_prices}, } @@ -488,7 +484,9 @@ func (w *wizard) getBroadcasterPrices() (string, error) { return "", err } - var status map[string]interface{} + var status struct { + BroadcasterPrices map[string]*big.Rat `json:"BroadcasterPrices"` + } err = json.Unmarshal(result, &status) if err != nil { return "", err @@ -496,13 +494,21 @@ func (w *wizard) getBroadcasterPrices() (string, error) { prices := new(bytes.Buffer) - if broadcasterPrices, ok := status["BroadcasterPrices"]; ok { - for b, p := range broadcasterPrices.(map[string]interface{}) { - if b != "default" { - fmt.Fprintf(prices, "%s: %s per pixel\n", b, p) - } + for b, p := range status.BroadcasterPrices { + if b != "default" { + fmt.Fprintf(prices, "%s: %s\n", b, formatPricePerPixel(p)) } } return prices.String(), nil } + +func formatPricePerPixel(price *big.Rat) string { + if price == nil { + return "n/a" + } + if price.IsInt() { + return fmt.Sprintf("%v wei/pixel", price.RatString()) + } + return fmt.Sprintf("%v wei/pixel (%v/%v)", price.FloatString(3), price.Num(), price.Denom()) +} diff --git a/cmd/livepeer_cli/wizard_transcoder.go b/cmd/livepeer_cli/wizard_transcoder.go index b25644a744..2416aadbd0 100644 --- a/cmd/livepeer_cli/wizard_transcoder.go +++ b/cmd/livepeer_cli/wizard_transcoder.go @@ -43,13 +43,7 @@ func myHostPort() string { return "https://" + ip + ":" + defaultRPCPort } -func (w *wizard) promptOrchestratorConfig() (float64, float64, int, int, string) { - var ( - blockRewardCut float64 - feeCut float64 - addr string - ) - +func (w *wizard) promptOrchestratorConfig() (blockRewardCut, feeCut float64, pricePerUnit, currency, pixelsPerUnit, serviceURI string) { orch, _, err := w.getOrchestratorInfo() if err != nil || orch == nil { fmt.Println("unable to get current reward cut and fee cut") @@ -68,17 +62,23 @@ func (w *wizard) promptOrchestratorConfig() (float64, float64, int, int, string) fmt.Println("eg. 1 wei / 10 pixels = 0,1 wei per pixel") fmt.Println() fmt.Printf("Enter amount of pixels that make up a single unit (default: 1 pixel) ") - pixelsPerUnit := w.readDefaultInt(1) - fmt.Printf("Enter the price for %d pixels in Wei (required) ", pixelsPerUnit) - pricePerUnit := w.readDefaultInt(0) + // Read numbers as strings not to lose precision and support big numbers + pixelsPerUnit = w.readDefaultString("1") + fmt.Println() + fmt.Printf("Enter the currency for the price per unit (default: Wei) ") + currency = w.readDefaultString("Wei") + fmt.Println() + fmt.Printf("Enter the price for %s pixels in %s (default: 0) ", pixelsPerUnit, currency) + pricePerUnit = w.readDefaultString("0") + var addr string if orch.ServiceURI == "" { addr = myHostPort() } else { addr = orch.ServiceURI } fmt.Printf("Enter the public host:port of node (default: %v)", addr) - serviceURI := w.readStringAndValidate(func(in string) (string, error) { + serviceURI = w.readStringAndValidate(func(in string) (string, error) { if "" == in { in = addr } @@ -92,7 +92,7 @@ func (w *wizard) promptOrchestratorConfig() (float64, float64, int, int, string) return in, nil }) - return blockRewardCut, 100 - feeCut, pricePerUnit, pixelsPerUnit, serviceURI + return blockRewardCut, 100 - feeCut, pricePerUnit, currency, pixelsPerUnit, serviceURI } func (w *wizard) activateOrchestrator() { @@ -196,13 +196,14 @@ func (w *wizard) setOrchestratorConfig() { } func (w *wizard) getOrchestratorConfigFormValues() url.Values { - blockRewardCut, feeShare, pricePerUnit, pixelsPerUnit, serviceURI := w.promptOrchestratorConfig() + blockRewardCut, feeShare, pricePerUnit, currency, pixelsPerUnit, serviceURI := w.promptOrchestratorConfig() return url.Values{ "blockRewardCut": {fmt.Sprintf("%v", blockRewardCut)}, "feeShare": {fmt.Sprintf("%v", feeShare)}, - "pricePerUnit": {fmt.Sprintf("%v", strconv.Itoa(pricePerUnit))}, - "pixelsPerUnit": {fmt.Sprintf("%v", strconv.Itoa(pixelsPerUnit))}, + "pricePerUnit": {fmt.Sprintf("%v", pricePerUnit)}, + "currency": {fmt.Sprintf("%v", currency)}, + "pixelsPerUnit": {fmt.Sprintf("%v", pixelsPerUnit)}, "serviceURI": {fmt.Sprintf("%v", serviceURI)}, } } @@ -319,18 +320,22 @@ func (w *wizard) setPriceForBroadcaster() { return in, nil }) - fmt.Println("Enter price per unit:") - price := w.readDefaultInt(0) - fmt.Println("Enter pixels per unit:") - pixels := w.readDefaultInt(1) + fmt.Println("Enter pixels per unit (default: 1 pixel)") + // Read numbers as strings not to lose precision and support big numbers + pixels := w.readDefaultString("1") + fmt.Println("Enter currency for the price per unit (default: Wei)") + currency := w.readDefaultString("Wei") + fmt.Println("Enter price per unit (default: 0)") + price := w.readDefaultString("0") data := url.Values{ - "pricePerUnit": {fmt.Sprintf("%v", strconv.Itoa(price))}, - "pixelsPerUnit": {fmt.Sprintf("%v", strconv.Itoa(pixels))}, + "pricePerUnit": {fmt.Sprintf("%v", price)}, + "currency": {fmt.Sprintf("%v", currency)}, + "pixelsPerUnit": {fmt.Sprintf("%v", pixels)}, "broadcasterEthAddr": {fmt.Sprintf("%v", ethaddr)}, } result, ok := httpPostWithParams(fmt.Sprintf("http://%v:%v/setPriceForBroadcaster", w.host, w.httpPort), data) if ok { - fmt.Printf("Price for broadcaster %v set to %v gwei per %v pixels", ethaddr, price, pixels) + fmt.Printf("Price for broadcaster %v set to %v %v per %v pixels", ethaddr, price, currency, pixels) return } else { fmt.Printf("Error setting price for broadcaster: %v", result) diff --git a/core/autoconvertedprice.go b/core/autoconvertedprice.go new file mode 100644 index 0000000000..b248937db1 --- /dev/null +++ b/core/autoconvertedprice.go @@ -0,0 +1,139 @@ +package core + +import ( + "context" + "fmt" + "math/big" + "strings" + "sync" + + "github.com/livepeer/go-livepeer/eth" + "github.com/livepeer/go-livepeer/eth/watchers" +) + +// PriceFeedWatcher is a global instance of a PriceFeedWatcher. It must be +// initialized before creating an AutoConvertedPrice instance. +var PriceFeedWatcher watchers.PriceFeedWatcher + +// Number of wei in 1 ETH +var weiPerETH = big.NewRat(1e18, 1) + +// AutoConvertedPrice represents a price that is automatically converted to wei +// based on the current price of ETH in a given currency. It uses the static +// PriceFeedWatcher that must be configured before creating an instance. +type AutoConvertedPrice struct { + cancelSubscription func() + onUpdate func(*big.Rat) + basePrice *big.Rat + + mu sync.RWMutex + current *big.Rat +} + +// NewFixedPrice creates a new AutoConvertedPrice with a fixed price in wei. +func NewFixedPrice(price *big.Rat) *AutoConvertedPrice { + return &AutoConvertedPrice{current: price} +} + +// NewAutoConvertedPrice creates a new AutoConvertedPrice instance with the given +// currency and base price. The onUpdate function is optional and gets called +// whenever the price is updated (also with the initial price). The Stop function +// must be called to free resources when the price is no longer needed. +func NewAutoConvertedPrice(currency string, basePrice *big.Rat, onUpdate func(*big.Rat)) (*AutoConvertedPrice, error) { + if onUpdate == nil { + onUpdate = func(*big.Rat) {} + } + + // Default currency (wei/eth) doesn't need the conversion loop + if lcurr := strings.ToLower(currency); lcurr == "" || lcurr == "wei" || lcurr == "eth" { + price := basePrice + if lcurr == "eth" { + price = new(big.Rat).Mul(basePrice, weiPerETH) + } + onUpdate(price) + return NewFixedPrice(price), nil + } + + if PriceFeedWatcher == nil { + return nil, fmt.Errorf("PriceFeedWatcher is not initialized") + } + + base, quote, err := PriceFeedWatcher.Currencies() + if err != nil { + return nil, fmt.Errorf("error getting price feed currencies: %v", err) + } + base, quote, currency = strings.ToUpper(base), strings.ToUpper(quote), strings.ToUpper(currency) + if base != "ETH" && quote != "ETH" { + return nil, fmt.Errorf("price feed does not have ETH as a currency (%v/%v)", base, quote) + } + if base != currency && quote != currency { + return nil, fmt.Errorf("price feed does not have %v as a currency (%v/%v)", currency, base, quote) + } + + currencyPrice, err := PriceFeedWatcher.Current() + if err != nil { + return nil, fmt.Errorf("error getting current price data: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + price := &AutoConvertedPrice{ + cancelSubscription: cancel, + onUpdate: onUpdate, + basePrice: basePrice, + current: new(big.Rat).Mul(basePrice, currencyToWeiMultiplier(currencyPrice, base)), + } + // Trigger the initial update with the current price + onUpdate(price.current) + + price.startAutoConvertLoop(ctx, base) + + return price, nil +} + +// Value returns the current price in wei. +func (a *AutoConvertedPrice) Value() *big.Rat { + a.mu.RLock() + defer a.mu.RUnlock() + return a.current +} + +// Stop unsubscribes from the price feed and frees resources from the +// auto-conversion loop. +func (a *AutoConvertedPrice) Stop() { + a.mu.Lock() + defer a.mu.Unlock() + if a.cancelSubscription != nil { + a.cancelSubscription() + a.cancelSubscription = nil + } +} + +func (a *AutoConvertedPrice) startAutoConvertLoop(ctx context.Context, baseCurrency string) { + priceUpdated := make(chan eth.PriceData, 1) + PriceFeedWatcher.Subscribe(ctx, priceUpdated) + go func() { + for { + select { + case <-ctx.Done(): + return + case currencyPrice := <-priceUpdated: + a.mu.Lock() + a.current = new(big.Rat).Mul(a.basePrice, currencyToWeiMultiplier(currencyPrice, baseCurrency)) + a.mu.Unlock() + + a.onUpdate(a.current) + } + } + }() +} + +// currencyToWeiMultiplier calculates the multiplier to convert the value +// specified in the custom currency to wei. +func currencyToWeiMultiplier(data eth.PriceData, baseCurrency string) *big.Rat { + ethMultipler := data.Price + if baseCurrency == "ETH" { + // Invert the multiplier if the quote is in the form ETH / X + ethMultipler = new(big.Rat).Inv(ethMultipler) + } + return new(big.Rat).Mul(ethMultipler, weiPerETH) +} diff --git a/core/autoconvertedprice_test.go b/core/autoconvertedprice_test.go new file mode 100644 index 0000000000..54db6cfde0 --- /dev/null +++ b/core/autoconvertedprice_test.go @@ -0,0 +1,258 @@ +package core + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/livepeer/go-livepeer/eth" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestNewAutoConvertedPrice(t *testing.T) { + t.Run("PriceFeedWatcher not initialized", func(t *testing.T) { + _, err := NewAutoConvertedPrice("USD", big.NewRat(1, 1), nil) + require.Error(t, err) + }) + + watcherMock := NewPriceFeedWatcherMock(t) + PriceFeedWatcher = watcherMock + watcherMock.On("Currencies").Return("ETH", "USD", nil) + + t.Run("Fixed price for wei", func(t *testing.T) { + price, err := NewAutoConvertedPrice("wei", big.NewRat(1, 1), nil) + require.NoError(t, err) + require.Equal(t, big.NewRat(1, 1), price.Value()) + require.Nil(t, price.cancelSubscription) + }) + + t.Run("Auto-converted price for ETH", func(t *testing.T) { + price, err := NewAutoConvertedPrice("ETH", big.NewRat(2, 1), nil) + require.NoError(t, err) + require.Equal(t, big.NewRat(2e18, 1), price.Value()) // 2 ETH in wei + require.Nil(t, price.cancelSubscription) + }) + + t.Run("Auto-converted price for USD", func(t *testing.T) { + watcherMock.On("Current").Return(eth.PriceData{Price: big.NewRat(100, 1)}, nil) + watcherMock.On("Subscribe", mock.Anything, mock.Anything).Once() + price, err := NewAutoConvertedPrice("USD", big.NewRat(2, 1), nil) + require.NoError(t, err) + require.Equal(t, big.NewRat(2e16, 1), price.Value()) // 2 USD * 1/100 ETH/USD + require.NotNil(t, price.cancelSubscription) + price.Stop() + }) + + t.Run("Currency not supported by feed", func(t *testing.T) { + _, err := NewAutoConvertedPrice("GBP", big.NewRat(1, 1), nil) + require.Error(t, err) + }) + + t.Run("Currency ETH not supported by feed", func(t *testing.T) { + // set up a new mock to change the currencies returned + watcherMock := NewPriceFeedWatcherMock(t) + PriceFeedWatcher = watcherMock + watcherMock.On("Currencies").Return("wei", "USD", nil) + + _, err := NewAutoConvertedPrice("USD", big.NewRat(1, 1), nil) + require.Error(t, err) + }) + + t.Run("Auto-converted price for inverted quote", func(t *testing.T) { + // set up a new mock to change the currencies returned + watcherMock := NewPriceFeedWatcherMock(t) + PriceFeedWatcher = watcherMock + watcherMock.On("Currencies").Return("USD", "ETH", nil) + watcherMock.On("Current").Return(eth.PriceData{Price: big.NewRat(1, 420)}, nil) + watcherMock.On("Subscribe", mock.Anything, mock.Anything).Once() + price, err := NewAutoConvertedPrice("USD", big.NewRat(66, 1), nil) + require.NoError(t, err) + require.Equal(t, big.NewRat(11e17, 7), price.Value()) // 66 USD * 1/420 ETH/USD + require.NotNil(t, price.cancelSubscription) + price.Stop() + }) +} + +func TestAutoConvertedPrice_Update(t *testing.T) { + require := require.New(t) + watcherMock := NewPriceFeedWatcherMock(t) + PriceFeedWatcher = watcherMock + + watcherMock.On("Currencies").Return("ETH", "USD", nil) + watcherMock.On("Current").Return(eth.PriceData{Price: big.NewRat(3000, 1)}, nil) + + priceUpdatedChan := make(chan *big.Rat, 1) + onUpdate := func(price *big.Rat) { + priceUpdatedChan <- price + } + + var sink chan<- eth.PriceData + watcherMock.On("Subscribe", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + sink = args.Get(1).(chan<- eth.PriceData) + }).Once() + + price, err := NewAutoConvertedPrice("USD", big.NewRat(50, 1), onUpdate) + require.NoError(err) + require.NotNil(t, price.cancelSubscription) + defer price.Stop() + watcherMock.AssertExpectations(t) + + require.Equal(big.NewRat(5e16, 3), price.Value()) // 50 USD * 1/3000 ETH/USD + require.Equal(big.NewRat(5e16, 3), <-priceUpdatedChan) // initial update must be sent + + // Simulate a price update + sink <- eth.PriceData{Price: big.NewRat(6000, 1)} + + select { + case updatedPrice := <-priceUpdatedChan: + require.Equal(big.NewRat(5e16, 6), updatedPrice) // 50 USD * 1/6000 USD/ETH + require.Equal(big.NewRat(5e16, 6), price.Value()) // must also udpate current value + case <-time.After(time.Second): + t.Fatal("Expected price update not received") + } +} + +func TestAutoConvertedPrice_Stop(t *testing.T) { + require := require.New(t) + watcherMock := NewPriceFeedWatcherMock(t) + PriceFeedWatcher = watcherMock + + watcherMock.On("Currencies").Return("ETH", "USD", nil) + watcherMock.On("Current").Return(eth.PriceData{Price: big.NewRat(100, 1)}, nil) + + var subsCtx context.Context + watcherMock.On("Subscribe", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + subsCtx = args.Get(0).(context.Context) + }).Once() + + price, err := NewAutoConvertedPrice("USD", big.NewRat(50, 1), nil) + require.NoError(err) + require.NotNil(t, price.cancelSubscription) + + price.Stop() + require.Nil(price.cancelSubscription) + require.Error(subsCtx.Err()) +} + +func TestCurrencyToWeiMultiplier(t *testing.T) { + tests := []struct { + name string + data eth.PriceData + baseCurrency string + expectedWei *big.Rat + }{ + { + name: "Base currency is ETH", + data: eth.PriceData{Price: big.NewRat(500, 1)}, // 500 USD per ETH + baseCurrency: "ETH", + expectedWei: big.NewRat(1e18, 500), // (1 / 500 USD/ETH) * 1e18 wei/ETH + }, + { + name: "Base currency is not ETH", + data: eth.PriceData{Price: big.NewRat(1, 2000)}, // 1/2000 ETH per USD + baseCurrency: "USD", + expectedWei: big.NewRat(5e14, 1), // (1 * 1/2000 ETH/USD) * 1e18 wei/ETH + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := currencyToWeiMultiplier(tt.data, tt.baseCurrency) + assert.Equal(t, 0, tt.expectedWei.Cmp(result)) + }) + } +} + +// Auto-generated code from here down. +// +// Code generated by mockery v2.42.1. DO NOT EDIT. + +// PriceFeedWatcherMock is an autogenerated mock type for the PriceFeedWatcher type +type PriceFeedWatcherMock struct { + mock.Mock +} + +// Currencies provides a mock function with given fields: +func (_m *PriceFeedWatcherMock) Currencies() (string, string, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Currencies") + } + + var r0 string + var r1 string + var r2 error + if rf, ok := ret.Get(0).(func() (string, string, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func() string); ok { + r1 = rf() + } else { + r1 = ret.Get(1).(string) + } + + if rf, ok := ret.Get(2).(func() error); ok { + r2 = rf() + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// Current provides a mock function with given fields: +func (_m *PriceFeedWatcherMock) Current() (eth.PriceData, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Current") + } + + var r0 eth.PriceData + var r1 error + if rf, ok := ret.Get(0).(func() (eth.PriceData, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() eth.PriceData); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(eth.PriceData) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Subscribe provides a mock function with given fields: ctx, sink +func (_m *PriceFeedWatcherMock) Subscribe(ctx context.Context, sink chan<- eth.PriceData) { + _m.Called(ctx, sink) +} + +// NewPriceFeedWatcherMock creates a new instance of PriceFeedWatcherMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewPriceFeedWatcherMock(t interface { + mock.TestingT + Cleanup(func()) +}) *PriceFeedWatcherMock { + mock := &PriceFeedWatcherMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/livepeernode.go b/core/livepeernode.go index 57b1055e39..f4741aae93 100644 --- a/core/livepeernode.go +++ b/core/livepeernode.go @@ -93,7 +93,7 @@ type LivepeerNode struct { StorageConfigs map[string]*transcodeConfig storageMutex *sync.RWMutex // Transcoder private fields - priceInfo map[string]*big.Rat + priceInfo map[string]*AutoConvertedPrice serviceURI url.URL segmentMutex *sync.RWMutex } @@ -109,7 +109,7 @@ func NewLivepeerNode(e eth.LivepeerEthClient, wd string, dbh *common.DB) (*Livep SegmentChans: make(map[ManifestID]SegmentChan), segmentMutex: &sync.RWMutex{}, Capabilities: &Capabilities{capacities: map[Capability]int{}}, - priceInfo: make(map[string]*big.Rat), + priceInfo: make(map[string]*AutoConvertedPrice), StorageConfigs: make(map[string]*transcodeConfig), storageMutex: &sync.RWMutex{}, }, nil @@ -128,12 +128,16 @@ func (n *LivepeerNode) SetServiceURI(newUrl *url.URL) { } // SetBasePrice sets the base price for an orchestrator on the node -func (n *LivepeerNode) SetBasePrice(b_eth_addr string, price *big.Rat) { +func (n *LivepeerNode) SetBasePrice(b_eth_addr string, price *AutoConvertedPrice) { addr := strings.ToLower(b_eth_addr) n.mu.Lock() defer n.mu.Unlock() + prevPrice := n.priceInfo[addr] n.priceInfo[addr] = price + if prevPrice != nil { + prevPrice.Stop() + } } // GetBasePrice gets the base price for an orchestrator @@ -142,14 +146,22 @@ func (n *LivepeerNode) GetBasePrice(b_eth_addr string) *big.Rat { n.mu.RLock() defer n.mu.RUnlock() - return n.priceInfo[addr] + price := n.priceInfo[addr] + if price == nil { + return nil + } + return price.Value() } func (n *LivepeerNode) GetBasePrices() map[string]*big.Rat { n.mu.RLock() defer n.mu.RUnlock() - return n.priceInfo + prices := make(map[string]*big.Rat) + for addr, price := range n.priceInfo { + prices[addr] = price.Value() + } + return prices } // SetMaxFaceValue sets the faceValue upper limit for tickets received diff --git a/core/livepeernode_test.go b/core/livepeernode_test.go index 259992f892..230f8dd421 100644 --- a/core/livepeernode_test.go +++ b/core/livepeernode_test.go @@ -162,8 +162,8 @@ func TestSetAndGetBasePrice(t *testing.T) { price := big.NewRat(1, 1) - n.SetBasePrice("default", price) - assert.Zero(n.priceInfo["default"].Cmp(price)) + n.SetBasePrice("default", NewFixedPrice(price)) + assert.Zero(n.priceInfo["default"].Value().Cmp(price)) assert.Zero(n.GetBasePrice("default").Cmp(price)) assert.Zero(n.GetBasePrices()["default"].Cmp(price)) @@ -172,10 +172,10 @@ func TestSetAndGetBasePrice(t *testing.T) { price1 := big.NewRat(2, 1) price2 := big.NewRat(3, 1) - n.SetBasePrice(addr1, price1) - n.SetBasePrice(addr2, price2) - assert.Zero(n.priceInfo[addr1].Cmp(price1)) - assert.Zero(n.priceInfo[addr2].Cmp(price2)) + n.SetBasePrice(addr1, NewFixedPrice(price1)) + n.SetBasePrice(addr2, NewFixedPrice(price2)) + assert.Zero(n.priceInfo[addr1].Value().Cmp(price1)) + assert.Zero(n.priceInfo[addr2].Value().Cmp(price2)) assert.Zero(n.GetBasePrices()[addr1].Cmp(price1)) assert.Zero(n.GetBasePrices()[addr2].Cmp(price2)) } diff --git a/core/orch_test.go b/core/orch_test.go index 981661433d..dcc8a6c2d9 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -704,7 +704,7 @@ func TestProcessPayment_GivenRecipientError_ReturnsNil(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) recipient.On("TxCostMultiplier", mock.Anything).Return(big.NewRat(1, 1), nil) recipient.On("ReceiveTicket", mock.Anything, mock.Anything, mock.Anything).Return("", false, nil) @@ -785,7 +785,7 @@ func TestProcessPayment_ActiveOrchestrator(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) // orchestrator inactive -> error err := orch.ProcessPayment(context.Background(), defaultPayment(t), ManifestID("some manifest")) @@ -856,7 +856,7 @@ func TestProcessPayment_GivenLosingTicket_DoesNotRedeem(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) recipient.On("TxCostMultiplier", mock.Anything).Return(big.NewRat(1, 1), nil) recipient.On("ReceiveTicket", mock.Anything, mock.Anything, mock.Anything).Return("some sessionID", false, nil) @@ -888,7 +888,7 @@ func TestProcessPayment_GivenWinningTicket_RedeemError(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") sessionID := "some sessionID" @@ -928,7 +928,7 @@ func TestProcessPayment_GivenWinningTicket_Redeems(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") sessionID := "some sessionID" @@ -968,7 +968,7 @@ func TestProcessPayment_GivenMultipleWinningTickets_RedeemsAll(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") sessionID := "some sessionID" @@ -1038,7 +1038,7 @@ func TestProcessPayment_GivenConcurrentWinningTickets_RedeemsAll(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestIDs := make([]string, 5) @@ -1097,7 +1097,7 @@ func TestProcessPayment_GivenReceiveTicketError_ReturnsError(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") @@ -1165,7 +1165,7 @@ func TestProcessPayment_PaymentError_DoesNotIncreaseCreditBalance(t *testing.T) } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") paymentError := errors.New("ReceiveTicket error") @@ -1227,7 +1227,7 @@ func TestSufficientBalance_IsSufficient_ReturnsTrue(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") @@ -1265,7 +1265,7 @@ func TestSufficientBalance_IsNotSufficient_ReturnsFalse(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") @@ -1307,7 +1307,7 @@ func TestSufficientBalance_OffChainMode_ReturnsTrue(t *testing.T) { func TestTicketParams(t *testing.T) { n, _ := NewLivepeerNode(nil, "", nil) - n.priceInfo["default"] = big.NewRat(1, 1) + n.priceInfo["default"] = NewFixedPrice(big.NewRat(1, 1)) priceInfo := &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 1} recipient := new(pm.MockRecipient) n.Recipient = recipient @@ -1388,7 +1388,7 @@ func TestPriceInfo(t *testing.T) { expPricePerPixel := big.NewRat(101, 100) n, _ := NewLivepeerNode(nil, "", nil) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) recipient := new(pm.MockRecipient) n.Recipient = recipient @@ -1406,7 +1406,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 10/1, txMultiplier = 100/1 => expPricePerPixel = 1010/100 basePrice = big.NewRat(10, 1) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(1010, 100) @@ -1421,7 +1421,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 1/10, txMultiplier = 100 => expPricePerPixel = 101/1000 basePrice = big.NewRat(1, 10) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(101, 1000) @@ -1435,7 +1435,7 @@ func TestPriceInfo(t *testing.T) { assert.Equal(priceInfo.PixelsPerUnit, expPrice.Denom().Int64()) // basePrice = 25/10 , txMultiplier = 100 => expPricePerPixel = 2525/1000 basePrice = big.NewRat(25, 10) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(2525, 1000) @@ -1451,7 +1451,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 10/1 , txMultiplier = 100/10 => expPricePerPixel = 11 basePrice = big.NewRat(10, 1) txMultiplier = big.NewRat(100, 10) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) recipient = new(pm.MockRecipient) n.Recipient = recipient recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil) @@ -1470,7 +1470,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 10/1 , txMultiplier = 1/10 => expPricePerPixel = 110 basePrice = big.NewRat(10, 1) txMultiplier = big.NewRat(1, 10) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) recipient = new(pm.MockRecipient) n.Recipient = recipient recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil) @@ -1489,7 +1489,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 10, txMultiplier = 1 => expPricePerPixel = 20 basePrice = big.NewRat(10, 1) txMultiplier = big.NewRat(1, 1) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) recipient = new(pm.MockRecipient) n.Recipient = recipient recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil) @@ -1506,7 +1506,7 @@ func TestPriceInfo(t *testing.T) { assert.Equal(priceInfo.PixelsPerUnit, expPrice.Denom().Int64()) // basePrice = 0 => expPricePerPixel = 0 - n.SetBasePrice("default", big.NewRat(0, 1)) + n.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) orch = NewOrchestrator(n, nil) priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") @@ -1516,7 +1516,7 @@ func TestPriceInfo(t *testing.T) { // test no overflows basePrice = big.NewRat(25000, 1) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) faceValue, _ := new(big.Int).SetString("22245599237119512", 10) txCost := new(big.Int).Mul(big.NewInt(100000), big.NewInt(7500000000)) txMultiplier = new(big.Rat).SetFrac(faceValue, txCost) // 926899968213313/31250000000000 @@ -1572,7 +1572,7 @@ func TestPriceInfo_TxMultiplierError_ReturnsError(t *testing.T) { expError := errors.New("TxMultiplier Error") n, _ := NewLivepeerNode(nil, "", nil) - n.SetBasePrice("default", big.NewRat(1, 1)) + n.SetBasePrice("default", NewFixedPrice(big.NewRat(1, 1))) recipient := new(pm.MockRecipient) n.Recipient = recipient recipient.On("TxCostMultiplier", mock.Anything).Return(nil, expError) diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index c52ce7ce61..c7e2b1d3ac 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -608,11 +608,11 @@ func TestNewOrchestratorPoolWithPred_TestPredicate(t *testing.T) { assert.True(t, pool.pred(oInfo)) // Set server.BroadcastCfg.maxPrice higher than PriceInfo , should return true - server.BroadcastCfg.SetMaxPrice(big.NewRat(10, 1)) + server.BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(10, 1))) assert.True(t, pool.pred(oInfo)) // Set MaxBroadcastPrice lower than PriceInfo, should return false - server.BroadcastCfg.SetMaxPrice(big.NewRat(1, 1)) + server.BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 1))) assert.False(t, pool.pred(oInfo)) // PixelsPerUnit is 0 , return false @@ -629,7 +629,7 @@ func TestCachedPool_AllOrchestratorsTooExpensive_ReturnsEmptyList(t *testing.T) expTranscoder := "transcoderFromTest" expPricePerPixel, _ := common.PriceToFixed(big.NewRat(999, 1)) - server.BroadcastCfg.SetMaxPrice(big.NewRat(1, 1)) + server.BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 1))) gmp := runtime.GOMAXPROCS(50) defer runtime.GOMAXPROCS(gmp) var mu sync.Mutex @@ -823,7 +823,7 @@ func TestCachedPool_N_OrchestratorsGoodPricing_ReturnsNOrchestrators(t *testing. }, } - server.BroadcastCfg.SetMaxPrice(big.NewRat(10, 1)) + server.BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(10, 1))) gmp := runtime.GOMAXPROCS(50) defer runtime.GOMAXPROCS(gmp) var mu sync.Mutex diff --git a/eth/watchers/pricefeedwatcher.go b/eth/watchers/pricefeedwatcher.go index ec7a81f2f3..1bab9dc130 100644 --- a/eth/watchers/pricefeedwatcher.go +++ b/eth/watchers/pricefeedwatcher.go @@ -19,78 +19,93 @@ const ( priceUpdatePeriod = 1 * time.Hour ) +type PriceFeedWatcher interface { + Currencies() (base string, quote string, err error) + Current() (eth.PriceData, error) + Subscribe(ctx context.Context, sink chan<- eth.PriceData) +} + // PriceFeedWatcher monitors a Chainlink PriceFeed for updated pricing info. It // allows fetching the current price as well as listening for updates on the // PriceUpdated channel. -type PriceFeedWatcher struct { +type priceFeedWatcher struct { baseRetryDelay time.Duration - priceFeed eth.PriceFeedEthClient - currencyBase, currencyQuote string + priceFeed eth.PriceFeedEthClient + + mu sync.RWMutex + current eth.PriceData + cancelWatch func() - mu sync.RWMutex - current eth.PriceData priceEventFeed event.Feed + subscriptions event.SubscriptionScope } // NewPriceFeedWatcher creates a new PriceFeedWatcher instance. It will already // fetch the current price and start a goroutine to watch for updates. -func NewPriceFeedWatcher(ethClient *ethclient.Client, priceFeedAddr string) (*PriceFeedWatcher, error) { +func NewPriceFeedWatcher(ethClient *ethclient.Client, priceFeedAddr string) (PriceFeedWatcher, error) { priceFeed, err := eth.NewPriceFeedEthClient(ethClient, priceFeedAddr) if err != nil { return nil, fmt.Errorf("failed to create price feed client: %w", err) } - - description, err := priceFeed.Description() - if err != nil { - return nil, fmt.Errorf("failed to get description: %w", err) - } - - currencyFrom, currencyTo, err := parseCurrencies(description) - if err != nil { - return nil, err - } - - w := &PriceFeedWatcher{ + w := &priceFeedWatcher{ baseRetryDelay: priceUpdateBaseRetryDelay, priceFeed: priceFeed, - currencyBase: currencyFrom, - currencyQuote: currencyTo, - } - - err = w.updatePrice() - if err != nil { - return nil, fmt.Errorf("failed to update price: %w", err) } - return w, nil } // Currencies returns the base and quote currencies of the price feed. // i.e. base = CurrentPrice() * quote -func (w *PriceFeedWatcher) Currencies() (base string, quote string) { - return w.currencyBase, w.currencyQuote +func (w *priceFeedWatcher) Currencies() (base string, quote string, err error) { + description, err := w.priceFeed.Description() + if err != nil { + return "", "", fmt.Errorf("failed to get description: %w", err) + } + + base, quote, err = parseCurrencies(description) + if err != nil { + return "", "", err + } + return } -// Current returns the latest fetched price data. -func (w *PriceFeedWatcher) Current() eth.PriceData { +// Current returns the latest fetched price data, or fetches it in case it has +// not been fetched yet. +func (w *priceFeedWatcher) Current() (eth.PriceData, error) { w.mu.RLock() - defer w.mu.RUnlock() - return w.current + current := w.current + w.mu.RUnlock() + if current.UpdatedAt.IsZero() { + return w.updatePrice() + } + return current, nil } // Subscribe allows one to subscribe to price updates emitted by the Watcher. -// To unsubscribe, simply call `Unsubscribe` on the returned subscription. // The sink channel should have ample buffer space to avoid blocking other -// subscribers. Slow subscribers are not dropped. -func (w *PriceFeedWatcher) Subscribe(sub chan<- eth.PriceData) event.Subscription { - return w.priceEventFeed.Subscribe(sub) +// subscribers. Slow subscribers are not dropped. The subscription is kept alive +// until the passed Context is cancelled. +// +// The watch loop is run automatically while there are active subscriptions. It +// will be started when the first subscription is made and is automatically +// stopped when the last subscription is closed. +func (w *priceFeedWatcher) Subscribe(ctx context.Context, sink chan<- eth.PriceData) { + w.mu.Lock() + defer w.mu.Unlock() + w.ensureWatchLocked() + + sub := w.subscriptions.Track(w.priceEventFeed.Subscribe(sink)) + go w.handleUnsubscribe(ctx, sub) } -func (w *PriceFeedWatcher) updatePrice() error { +// updatePrice fetches the latest price data from the price feed and updates the +// current price if it is newer. If the price is updated, it will also send the +// updated price to the price event feed. +func (w *priceFeedWatcher) updatePrice() (eth.PriceData, error) { newPrice, err := w.priceFeed.FetchPriceData() if err != nil { - return fmt.Errorf("failed to fetch price data: %w", err) + return eth.PriceData{}, fmt.Errorf("failed to fetch price data: %w", err) } if newPrice.UpdatedAt.After(w.current.UpdatedAt) { @@ -100,26 +115,62 @@ func (w *PriceFeedWatcher) updatePrice() error { w.priceEventFeed.Send(newPrice) } - return nil + return newPrice, nil } -// Watch starts the watch process. It will periodically poll the price feed for -// price updates until the given context is canceled. Typically, you want to -// call Watch inside a goroutine. -func (w *PriceFeedWatcher) Watch(ctx context.Context) { +// ensureWatchLocked makes sure that the watch process is running. It assumes it +// is already running in a locked context (w.mu). The watch process itself will +// run in background and periodically poll the price feed for updates until the +// `w.cancelWatch` function is called. +func (w *priceFeedWatcher) ensureWatchLocked() { + if w.cancelWatch != nil { + // already running + return + } + ctx, cancel := context.WithCancel(context.Background()) + w.cancelWatch = cancel + ticker := newTruncatedTicker(ctx, priceUpdatePeriod) - w.watchTicker(ctx, ticker) + go w.watchTicker(ctx, ticker) +} + +// handleUnsubscribe waits for the provided Context to be done and then closes +// the given subscription. It then stops the watch process if there are no more +// active subscriptions. +func (w *priceFeedWatcher) handleUnsubscribe(ctx context.Context, sub event.Subscription) { +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-sub.Err(): + clog.Errorf(ctx, "PriceFeedWatcher subscription error: %v", sub.Err()) + } + } + w.mu.Lock() + defer w.mu.Unlock() + + sub.Unsubscribe() + if w.subscriptions.Count() == 0 && w.cancelWatch != nil { + w.cancelWatch() + w.cancelWatch = nil + } } -func (w *PriceFeedWatcher) watchTicker(ctx context.Context, ticker <-chan time.Time) { +// watchTicker is the main loop that periodically fetches the latest price data +// from the price feed. It's lifecycle is handled through the ensureWatch and +// handleUnsubscribe functions. +func (w *priceFeedWatcher) watchTicker(ctx context.Context, ticker <-chan time.Time) { + clog.V(6).Infof(ctx, "Starting PriceFeed watch loop") for { select { case <-ctx.Done(): + clog.V(6).Infof(ctx, "Stopping PriceFeed watch loop") return case <-ticker: attempt, retryDelay := 1, w.baseRetryDelay for { - err := w.updatePrice() + _, err := w.updatePrice() if err == nil { break } else if attempt >= priceUpdateMaxRetries { @@ -159,7 +210,8 @@ func parseCurrencies(description string) (currencyBase string, currencyQuote str func newTruncatedTicker(ctx context.Context, d time.Duration) <-chan time.Time { ch := make(chan time.Time, 1) go func() { - defer close(ch) + // Do not close the channel, to prevent a concurrent goroutine reading from + // the channel from seeing an erroneous "tick" after its closed. nextTick := time.Now().UTC().Truncate(d) for { diff --git a/eth/watchers/pricefeedwatcher_test.go b/eth/watchers/pricefeedwatcher_test.go index 20e09578e1..27eb5d24d0 100644 --- a/eth/watchers/pricefeedwatcher_test.go +++ b/eth/watchers/pricefeedwatcher_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math/big" + "reflect" "testing" "time" @@ -37,18 +38,16 @@ func TestPriceFeedWatcher_UpdatePrice(t *testing.T) { } priceFeedMock.On("FetchPriceData").Return(priceData, nil).Once() - w := &PriceFeedWatcher{ - priceFeed: priceFeedMock, - currencyBase: "ETH", - currencyQuote: "USD", - } + w := &priceFeedWatcher{priceFeed: priceFeedMock} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() priceUpdated := make(chan eth.PriceData, 1) - sub := w.Subscribe(priceUpdated) - defer sub.Unsubscribe() + w.Subscribe(ctx, priceUpdated) - require.NoError(t, w.updatePrice()) - require.Equal(t, priceData, w.current) + newPrice, err := w.updatePrice() + require.NoError(t, err) + require.Equal(t, priceData, newPrice) select { case updatedPrice := <-priceUpdated: @@ -58,20 +57,59 @@ func TestPriceFeedWatcher_UpdatePrice(t *testing.T) { } } -func TestPriceFeedWatcher_Watch(t *testing.T) { +func TestPriceFeedWatcher_Subscribe(t *testing.T) { require := require.New(t) priceFeedMock := new(mockPriceFeedEthClient) defer priceFeedMock.AssertExpectations(t) - w := &PriceFeedWatcher{ - priceFeed: priceFeedMock, - currencyBase: "ETH", - currencyQuote: "USD", + w := &priceFeedWatcher{priceFeed: priceFeedMock} + + // Start a bunch of subscriptions and make sure only 1 watch loop gets started + observedCancelWatch := []context.CancelFunc{} + cancelSub := []context.CancelFunc{} + for i := 0; i < 5; i++ { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + w.Subscribe(ctx, make(chan eth.PriceData, 1)) + + observedCancelWatch = append(observedCancelWatch, w.cancelWatch) + cancelSub = append(cancelSub, cancel) + } + + require.NotNil(w.cancelWatch) + for i := range observedCancelWatch { + require.Equal(reflect.ValueOf(w.cancelWatch).Pointer(), reflect.ValueOf(observedCancelWatch[i]).Pointer()) } + // Stop all but the last subscription and ensure watch loop stays running + for i := 0; i < 4; i++ { + cancelSub[i]() + require.NotNil(w.cancelWatch) + } + + // Now stop the last subscription and ensure watch loop gets stopped + cancelSub[4]() + time.Sleep(1 * time.Second) + require.Nil(w.cancelWatch) + + // Finally, just make sure it can be started again after having been stopped + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + w.Subscribe(ctx, make(chan eth.PriceData, 1)) + require.NotNil(w.cancelWatch) +} + +func TestPriceFeedWatcher_Watch(t *testing.T) { + require := require.New(t) + priceFeedMock := new(mockPriceFeedEthClient) + defer priceFeedMock.AssertExpectations(t) + + w := &priceFeedWatcher{priceFeed: priceFeedMock} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() priceUpdated := make(chan eth.PriceData, 1) - sub := w.Subscribe(priceUpdated) - defer sub.Unsubscribe() + w.Subscribe(ctx, priceUpdated) priceData := eth.PriceData{ RoundID: 10, @@ -100,8 +138,6 @@ func TestPriceFeedWatcher_Watch(t *testing.T) { // Start the watch loop fakeTicker := make(chan time.Time, 10) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() go func() { w.watchTicker(ctx, fakeTicker) }() @@ -150,21 +186,18 @@ func TestPriceFeedWatcher_WatchErrorRetries(t *testing.T) { } priceFeedMock.On("FetchPriceData").Return(priceData, nil) - w := &PriceFeedWatcher{ + w := &priceFeedWatcher{ baseRetryDelay: 5 * time.Millisecond, priceFeed: priceFeedMock, - currencyBase: "ETH", - currencyQuote: "USD", } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() priceUpdated := make(chan eth.PriceData, 1) - sub := w.Subscribe(priceUpdated) - defer sub.Unsubscribe() + w.Subscribe(ctx, priceUpdated) // Start watch loop fakeTicker := make(chan time.Time, 10) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() go func() { w.watchTicker(ctx, fakeTicker) }() diff --git a/server/broadcast.go b/server/broadcast.go index 5658f0cda6..2edcd4f58d 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -56,7 +56,7 @@ var submitMultiSession = func(ctx context.Context, sess *BroadcastSession, seg * var maxTranscodeAttempts = errors.New("hit max transcode attempts") type BroadcastConfig struct { - maxPrice *big.Rat + maxPrice *core.AutoConvertedPrice mu sync.RWMutex } @@ -68,16 +68,19 @@ type SegFlightMetadata struct { func (cfg *BroadcastConfig) MaxPrice() *big.Rat { cfg.mu.RLock() defer cfg.mu.RUnlock() - return cfg.maxPrice + if cfg.maxPrice == nil { + return nil + } + return cfg.maxPrice.Value() } -func (cfg *BroadcastConfig) SetMaxPrice(price *big.Rat) { +func (cfg *BroadcastConfig) SetMaxPrice(price *core.AutoConvertedPrice) { cfg.mu.Lock() defer cfg.mu.Unlock() + prevPrice := cfg.maxPrice cfg.maxPrice = price - - if monitor.Enabled { - monitor.MaxTranscodingPrice(price) + if prevPrice != nil { + prevPrice.Stop() } } diff --git a/server/handlers.go b/server/handlers.go index 7461065fc1..7514de56ff 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -20,6 +20,7 @@ import ( "github.com/livepeer/go-livepeer/core" "github.com/livepeer/go-livepeer/eth" "github.com/livepeer/go-livepeer/eth/types" + "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-livepeer/pm" "github.com/livepeer/lpms/ffmpeg" "github.com/pkg/errors" @@ -125,6 +126,7 @@ func setBroadcastConfigHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { pricePerUnit := r.FormValue("maxPricePerUnit") pixelsPerUnit := r.FormValue("pixelsPerUnit") + currency := r.FormValue("currency") transcodingOptions := r.FormValue("transcodingOptions") if (pricePerUnit == "" || pixelsPerUnit == "") && transcodingOptions == "" { @@ -134,28 +136,38 @@ func setBroadcastConfigHandler() http.Handler { // set max price if pricePerUnit != "" && pixelsPerUnit != "" { - pr, err := strconv.ParseInt(pricePerUnit, 10, 64) - if err != nil { - respond400(w, errors.Wrapf(err, "Error converting string to int64").Error()) + pr, ok := new(big.Rat).SetString(pricePerUnit) + if !ok { + respond400(w, fmt.Sprintf("Error parsing pricePerUnit value: %s", pricePerUnit)) return } - px, err := strconv.ParseInt(pixelsPerUnit, 10, 64) - if err != nil { - respond400(w, errors.Wrapf(err, "Error converting string to int64").Error()) + px, ok := new(big.Rat).SetString(pixelsPerUnit) + if !ok { + respond400(w, fmt.Sprintf("Error parsing pixelsPerUnit value: %s", pixelsPerUnit)) return } - if px <= 0 { - respond400(w, fmt.Sprintf("pixels per unit must be greater than 0, provided %d", px)) + if px.Sign() <= 0 { + respond400(w, fmt.Sprintf("pixels per unit must be greater than 0, provided %v", pixelsPerUnit)) return } - - var price *big.Rat - if pr > 0 { - price = big.NewRat(pr, px) + pricePerPixel := new(big.Rat).Quo(pr, px) + + var autoPrice *core.AutoConvertedPrice + if pricePerPixel.Sign() > 0 { + var err error + autoPrice, err = core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + if monitor.Enabled { + monitor.MaxTranscodingPrice(price) + } + glog.Infof("Maximum transcoding price: %v wei per pixel\n", price.FloatString(3)) + }) + if err != nil { + respond400(w, errors.Wrap(err, "error converting price").Error()) + return + } } - BroadcastCfg.SetMaxPrice(price) - glog.Infof("Maximum transcoding price: %d per %q pixels\n", pr, px) + BroadcastCfg.SetMaxPrice(autoPrice) } // set broadcast profiles @@ -291,7 +303,8 @@ func (s *LivepeerServer) activateOrchestratorHandler(client eth.LivepeerEthClien return } - if err := s.setOrchestratorPriceInfo("default", r.FormValue("pricePerUnit"), r.FormValue("pixelsPerUnit")); err != nil { + pricePerUnit, pixelsPerUnit, currency := r.FormValue("pricePerUnit"), r.FormValue("pixelsPerUnit"), r.FormValue("currency") + if err := s.setOrchestratorPriceInfo("default", pricePerUnit, pixelsPerUnit, currency); err != nil { respond400(w, err.Error()) return } @@ -385,8 +398,9 @@ func (s *LivepeerServer) setOrchestratorConfigHandler(client eth.LivepeerEthClie return mustHaveClient(client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { pixels := r.FormValue("pixelsPerUnit") price := r.FormValue("pricePerUnit") + currency := r.FormValue("currency") if pixels != "" && price != "" { - if err := s.setOrchestratorPriceInfo("default", price, pixels); err != nil { + if err := s.setOrchestratorPriceInfo("default", price, pixels, currency); err != nil { respond400(w, err.Error()) return } @@ -458,53 +472,43 @@ func (s *LivepeerServer) setOrchestratorConfigHandler(client eth.LivepeerEthClie })) } -func (s *LivepeerServer) setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUnitStr, pixelsPerUnitStr string) error { - ok, err := regexp.MatchString("^[0-9]+$", pricePerUnitStr) +func (s *LivepeerServer) setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUnitStr, pixelsPerUnitStr, currency string) error { + ok, err := regexp.MatchString("^0x[0-9a-fA-F]{40}|default$", broadcasterEthAddr) if err != nil { return err } if !ok { - return fmt.Errorf("pricePerUnit is not a valid integer, provided %v", pricePerUnitStr) + return fmt.Errorf("broadcasterEthAddr is not a valid eth address, provided %v", broadcasterEthAddr) } - ok, err = regexp.MatchString("^[0-9]+$", pixelsPerUnitStr) - if err != nil { - return err - } + pricePerUnit, ok := new(big.Rat).SetString(pricePerUnitStr) if !ok { - return fmt.Errorf("pixelsPerUnit is not a valid integer, provided %v", pixelsPerUnitStr) + return fmt.Errorf("error parsing pricePerUnit value: %s", pricePerUnitStr) } - - ok, err = regexp.MatchString("^0x[0-9a-fA-F]{40}|default$", broadcasterEthAddr) - if err != nil { - return err - } - if !ok { - return fmt.Errorf("broadcasterEthAddr is not a valid eth address, provided %v", broadcasterEthAddr) + if pricePerUnit.Sign() < 0 { + return fmt.Errorf("price unit must be greater than or equal to 0, provided %s", pricePerUnitStr) } - pricePerUnit, err := strconv.ParseInt(pricePerUnitStr, 10, 64) - if err != nil { - return fmt.Errorf("error converting pricePerUnit string to int64: %v", err) + pixelsPerUnit, ok := new(big.Rat).SetString(pixelsPerUnitStr) + if !ok { + return fmt.Errorf("error parsing pixelsPerUnit value: %v", pixelsPerUnitStr) } - if pricePerUnit < 0 { - return fmt.Errorf("price unit must be greater than or equal to 0, provided %d", pricePerUnit) + if pixelsPerUnit.Sign() <= 0 { + return fmt.Errorf("pixels per unit must be greater than 0, provided %s", pixelsPerUnitStr) } - pixelsPerUnit, err := strconv.ParseInt(pixelsPerUnitStr, 10, 64) + pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit) + autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + if broadcasterEthAddr == "default" { + glog.Infof("Price: %v wei per pixel\n ", price.FloatString(3)) + } else { + glog.Infof("Price: %v wei per pixel for broadcaster %v", price.FloatString(3), broadcasterEthAddr) + } + }) if err != nil { - return fmt.Errorf("error converting pixelsPerUnit string to int64: %v", err) - } - if pixelsPerUnit <= 0 { - return fmt.Errorf("pixels per unit must be greater than 0, provided %d", pixelsPerUnit) - } - - s.LivepeerNode.SetBasePrice(broadcasterEthAddr, big.NewRat(pricePerUnit, pixelsPerUnit)) - if broadcasterEthAddr == "default" { - glog.Infof("Price per pixel set to %d wei for %d pixels\n", pricePerUnit, pixelsPerUnit) - } else { - glog.Infof("Price per pixel set to %d wei for %d pixels for broadcaster %s\n", pricePerUnit, pixelsPerUnit, broadcasterEthAddr) + return fmt.Errorf("error converting price: %v", err) } + s.LivepeerNode.SetBasePrice(broadcasterEthAddr, autoPrice) return nil } @@ -564,9 +568,10 @@ func (s *LivepeerServer) setPriceForBroadcaster() http.Handler { if s.LivepeerNode.NodeType == core.OrchestratorNode { pricePerUnitStr := r.FormValue("pricePerUnit") pixelsPerUnitStr := r.FormValue("pixelsPerUnit") + currency := r.FormValue("currency") broadcasterEthAddr := r.FormValue("broadcasterEthAddr") - err := s.setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUnitStr, pixelsPerUnitStr) + err := s.setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUnitStr, pixelsPerUnitStr, currency) if err == nil { respondOk(w, []byte(fmt.Sprintf("Price per pixel set to %s wei for %s pixels for broadcaster %s\n", pricePerUnitStr, pixelsPerUnitStr, broadcasterEthAddr))) } else { diff --git a/server/handlers_test.go b/server/handlers_test.go index aa01fe88cc..f7852e3d46 100644 --- a/server/handlers_test.go +++ b/server/handlers_test.go @@ -116,7 +116,7 @@ func TestOrchestratorInfoHandler_Success(t *testing.T) { s := &LivepeerServer{LivepeerNode: n} price := big.NewRat(1, 2) - s.LivepeerNode.SetBasePrice("default", price) + s.LivepeerNode.SetBasePrice("default", core.NewFixedPrice(price)) trans := &types.Transcoder{ ServiceURI: "127.0.0.1:8935", @@ -196,7 +196,7 @@ func TestSetBroadcastConfigHandler_ConvertPricePerUnitError(t *testing.T) { }) assert.Equal(http.StatusBadRequest, status) - assert.Contains(body, "Error converting string to int64") + assert.Contains(body, "Error parsing pricePerUnit value") } func TestSetBroadcastConfigHandler_ConvertPixelsPerUnitError(t *testing.T) { @@ -209,7 +209,7 @@ func TestSetBroadcastConfigHandler_ConvertPixelsPerUnitError(t *testing.T) { }) assert.Equal(http.StatusBadRequest, status) - assert.Contains(body, "Error converting string to int64") + assert.Contains(body, "Error parsing pixelsPerUnit value") } func TestSetBroadcastConfigHandler_NegativePixelPerUnitError(t *testing.T) { @@ -259,7 +259,7 @@ func TestSetBroadcastConfigHandler_Success(t *testing.T) { func TestGetBroadcastConfigHandler(t *testing.T) { assert := assert.New(t) - BroadcastCfg.maxPrice = big.NewRat(1, 2) + BroadcastCfg.maxPrice = core.NewFixedPrice(big.NewRat(1, 2)) BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ ffmpeg.VideoProfileLookup["P240p25fps16x9"], } @@ -501,26 +501,31 @@ func TestSetOrchestratorPriceInfo(t *testing.T) { s := stubServer() // pricePerUnit is not an integer - err := s.setOrchestratorPriceInfo("default", "nil", "1") + err := s.setOrchestratorPriceInfo("default", "nil", "1", "") assert.Error(t, err) - assert.True(t, strings.Contains(err.Error(), "pricePerUnit is not a valid integer")) + assert.Contains(t, err.Error(), "error parsing pricePerUnit value") // pixelsPerUnit is not an integer - err = s.setOrchestratorPriceInfo("default", "1", "nil") + err = s.setOrchestratorPriceInfo("default", "1", "nil", "") assert.Error(t, err) - assert.True(t, strings.Contains(err.Error(), "pixelsPerUnit is not a valid integer")) + assert.Contains(t, err.Error(), "error parsing pixelsPerUnit value") - err = s.setOrchestratorPriceInfo("default", "1", "1") + // price feed watcher is not initialized and one attempts a custom currency + err = s.setOrchestratorPriceInfo("default", "1e12", "0.7", "USD") + assert.Error(t, err) + assert.Contains(t, err.Error(), "PriceFeedWatcher is not initialized") + + err = s.setOrchestratorPriceInfo("default", "1", "1", "") assert.Nil(t, err) assert.Zero(t, s.LivepeerNode.GetBasePrice("default").Cmp(big.NewRat(1, 1))) - err = s.setOrchestratorPriceInfo("default", "-5", "1") + err = s.setOrchestratorPriceInfo("default", "-5", "1", "") assert.EqualErrorf(t, err, err.Error(), "price unit must be greater than or equal to 0, provided %d\n", -5) // pixels per unit <= 0 - err = s.setOrchestratorPriceInfo("default", "1", "0") + err = s.setOrchestratorPriceInfo("default", "1", "0", "") assert.EqualErrorf(t, err, err.Error(), "pixels per unit must be greater than 0, provided %d\n", 0) - err = s.setOrchestratorPriceInfo("default", "1", "-5") + err = s.setOrchestratorPriceInfo("default", "1", "-5", "") assert.EqualErrorf(t, err, err.Error(), "pixels per unit must be greater than 0, provided %d\n", -5) } diff --git a/server/rpc_test.go b/server/rpc_test.go index 712568c20d..ab35bb0875 100644 --- a/server/rpc_test.go +++ b/server/rpc_test.go @@ -550,7 +550,7 @@ func TestGenPayment(t *testing.T) { s.Sender = sender // Test invalid price - BroadcastCfg.SetMaxPrice(big.NewRat(1, 5)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 5))) payment, err = genPayment(context.TODO(), s, 1) assert.Equal("", payment) assert.Errorf(err, err.Error(), "Orchestrator price higher than the set maximum price of %v wei per %v pixels", int64(1), int64(5)) @@ -687,12 +687,12 @@ func TestValidatePrice(t *testing.T) { defer BroadcastCfg.SetMaxPrice(nil) // B MaxPrice > O Price - BroadcastCfg.SetMaxPrice(big.NewRat(5, 1)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(5, 1))) err = validatePrice(s) assert.Nil(err) // B MaxPrice == O Price - BroadcastCfg.SetMaxPrice(big.NewRat(1, 3)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 3))) err = validatePrice(s) assert.Nil(err) @@ -713,7 +713,7 @@ func TestValidatePrice(t *testing.T) { // B MaxPrice < O Price s.InitialPrice = nil - BroadcastCfg.SetMaxPrice(big.NewRat(1, 5)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 5))) err = validatePrice(s) assert.EqualError(err, fmt.Sprintf("Orchestrator price higher than the set maximum price of %v wei per %v pixels", int64(1), int64(5))) diff --git a/server/segment_rpc_test.go b/server/segment_rpc_test.go index 6feb1a5ba8..f8786202cd 100644 --- a/server/segment_rpc_test.go +++ b/server/segment_rpc_test.go @@ -1679,7 +1679,7 @@ func TestSubmitSegment_GenPaymentError_ValidatePriceError(t *testing.T) { OrchestratorInfo: oinfo, } - BroadcastCfg.SetMaxPrice(big.NewRat(1, 5)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 5))) defer BroadcastCfg.SetMaxPrice(nil) _, err := SubmitSegment(context.TODO(), s, &stream.HLSSegment{}, nil, 0, false, true) diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 59a77b5c88..efc723d602 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -122,7 +122,7 @@ func lpCfg() starter.LivepeerConfig { ethPassword := "" network := "devnet" blockPollingInterval := 1 - pricePerUnit := 1 + pricePerUnit := "1" initializeRound := true cfg := starter.DefaultLivepeerConfig()