Skip to content

Commit

Permalink
Merge v2 networking and partial unbonding
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh committed Aug 2, 2018
2 parents 5d0e059 + a005eca commit 2e9b5da
Show file tree
Hide file tree
Showing 34 changed files with 4,710 additions and 714 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
all: server/lp_rpc.pb.go

server/lp_rpc.pb.go: server/lp_rpc.proto
protoc -I=. --go_out=plugins=grpc:. $^
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ You can also build the executables from scratch.

### Broadcasting

To broadcast, run `./livepeer_cli` and pick 'Broadcast Video'.
* You should see your webcam becoming active and a manifestID printed on the screen.
For full details, read the [Broadcasting guide](http://livepeer.readthedocs.io/en/latest/broadcasting.html).

Sometimes you want to use third-party broadcasting software, especially if you are running the software on Windows or Linux. Livepeer can take any RTMP stream as input, so you can use other popular streaming software to create the video stream. We recommend [OBS](https://obsproject.com/download) or [ffmpeg](https://www.ffmpeg.org/).

By default, the RTMP port is 1935. For example, if you are using OSX with ffmpeg, run

`ffmpeg -f avfoundation -framerate 30 -pixel_format uyvy422 -i "0:0" -vcodec libx264 -tune zerolatency -b 1000k -x264-params keyint=60:min-keyint=60 -acodec aac -ac 1 -b:a 96k -f flv rtmp://localhost:1935/movie`

Similarly, you can use OBS, and change the setting->stream->URL to `rtmp://localhost:1935/movie`
Similarly, you can use OBS, and change the Settings->Stream->URL to `rtmp://localhost:1935/movie` , along with the keyframe interval to 4 seconds, via `Settings -> Output -> Output Mode (Advanced) -> Streaming tab -> Keyframe Interval 4`.

If the broadcast is successful, you should be able to get a streamID by querying the local node:

Expand All @@ -82,7 +81,7 @@ For example, after you get the streamID, you can view the stream by running:

### Becoming a Transcoder

We'll walk through the steps of becoming a transcoder on the test network. To learn more about the transcoder, refer to the [Livepeer whitepaper](https://github.com/livepeer/wiki/blob/master/WHITEPAPER.md)
We'll walk through the steps of becoming a transcoder on the test network. To learn more about the transcoder, refer to the [Livepeer whitepaper](https://github.com/livepeer/wiki/blob/master/WHITEPAPER.md) and the [Transcoding guide](http://livepeer.readthedocs.io/en/latest/transcoding.html).

- `livepeer --rinkeby --transcoder` to start the node as a transcoder.

Expand All @@ -94,6 +93,7 @@ We'll walk through the steps of becoming a transcoder on the test network. To l

- Wait for the next round to start, and your transcoder will become active.

- If running on Rinkeby or mainnet, ensure your transcoder is publicly available in order to receive jobs from broadcasters.

## Contribution
Thank you for your interest in contributing to the core software of Livepeer.
Expand Down
85 changes: 53 additions & 32 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"fmt"
"io/ioutil"
"math/big"
"net/http"
"net/url"
"os"
"os/signal"
"os/user"
Expand Down Expand Up @@ -91,7 +93,7 @@ func main() {
ipfsPath := flag.String("ipfsPath", fmt.Sprintf("%v/.ipfs", usr.HomeDir), "IPFS path")
noIPFSLogFiles := flag.Bool("noIPFSLogFiles", false, "Set to true if log files should not be generated")
offchain := flag.Bool("offchain", false, "Set to true to start the node in offchain mode")
publicIP := flag.String("publicIP", "", "Explicit set node IP address so nodes that need a well-known address can advertise it to the network")
publicAddr := flag.String("publicAddr", "", "Public address that broadcasters can use to contact this node; may be an IP or hostname. If used, should match the on-chain ServiceURI set via livepeer_cli")
initializeRound := flag.Bool("initializeRound", false, "Set to true if running as a transcoder and the node should automatically initialize new rounds")
version := flag.Bool("version", false, "Print out the version")

Expand All @@ -111,11 +113,7 @@ func main() {
}
if !*offchain {
if *ethUrl == "" {
if *transcoder {
*ethUrl = "wss://rinkeby.infura.io/ws"
} else {
*ethUrl = "https://rinkeby.infura.io/cFwU3koCZdTqiH6VE4fj"
}
*ethUrl = "wss://rinkeby.infura.io/ws"
}
if *controllerAddr == "" {
*controllerAddr = RinkebyControllerAddr
Expand All @@ -132,11 +130,7 @@ func main() {
}
if !*offchain {
if *ethUrl == "" {
if *transcoder {
*ethUrl = "wss://mainnet.infura.io/ws"
} else {
*ethUrl = "https://mainnet.infura.io/cFwU3koCZdTqiH6VE4fj"
}
*ethUrl = "wss://mainnet.infura.io/ws"
}
if *controllerAddr == "" {
*controllerAddr = MainnetControllerAddr
Expand Down Expand Up @@ -191,15 +185,7 @@ func main() {
glog.Errorf("Error creating a new node: %v", err)
return
}
if *transcoder && *publicIP == "" {
glog.Errorf("Error - transcoder needs to specify publicIP")
return
}
if *transcoder && *publicIP == "" {
glog.Errorf("Error - transcoder needs to specify publicIP")
return
}
nw, err := bnet.NewBasicVideoNetwork(node, *publicIP, *port)
nw, err := bnet.NewBasicVideoNetwork(node, "127.0.0.1", *port)
if err != nil {
glog.Errorf("Cannot create network node: %v", err)
return
Expand Down Expand Up @@ -326,19 +312,31 @@ func main() {
return true
})

if *transcoder {
addrMap := n.Eth.ContractAddresses()
em := eth.NewEventMonitor(backend, addrMap)
defer n.StopEthServices()

addrMap := n.Eth.ContractAddresses()
em := eth.NewEventMonitor(backend, addrMap)

// Setup block service to receive headers from the head of the chain
n.EthServices["BlockService"] = eventservices.NewBlockService(em, dbh)
// Setup unbonding service to manage unbonding locks
n.EthServices["UnbondingService"] = eventservices.NewUnbondingService(n.Eth, dbh)

if *transcoder {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := setupTranscoder(ctx, n, em, *ipfsPath, *initializeRound); err != nil {
if err := setupTranscoder(ctx, n, em, *ipfsPath, *initializeRound, *publicAddr); err != nil {
glog.Errorf("Error setting up transcoder: %v", err)
return
}
}

defer n.StopEthServices()
// Start services
err = n.StartEthServices()
if err != nil {
glog.Errorf("Failed to start ETH services: %v", err)
return
}
}

Expand Down Expand Up @@ -493,7 +491,7 @@ func getLPKeys(datadir string) (crypto.PrivKey, crypto.PubKey, error) {
return priv, pub, nil
}

func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMonitor, ipfsPath string, initializeRound bool) error {
func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMonitor, ipfsPath string, initializeRound bool, publicAddr string) error {
//Check if transcoder is active
active, err := n.Eth.IsActiveTranscoder()
if err != nil {
Expand All @@ -506,6 +504,35 @@ func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMoni
glog.Infof("Transcoder %v is active", n.Eth.Account().Address.Hex())
}

if publicAddr == "" {
// TODO probably should put this (along w wizard GETs) into common code
resp, err := http.Get("https://api.ipify.org?format=text")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
glog.Error("Could not look up public IP address")
return err
}
publicAddr = strings.TrimSpace(string(body))
}
uriStr, err := n.Eth.GetServiceURI(n.Eth.Account().Address)
if err != nil {
glog.Error("Could not get service URI")
return err
}
uri, err := url.ParseRequestURI(uriStr)
if err != nil {
glog.Error("Could not parse service URI")
uri, _ = url.ParseRequestURI("http://127.0.0.1")
}
if uri.Hostname() != publicAddr {
glog.Errorf("Service address %v did not match discovered address %v; set the correct address in livepeer_cli or use -publicAddr", uri.Hostname(), publicAddr)
// TODO remove '&& false' after all transcoders have set a service URI
if active && false {
return fmt.Errorf("Mismatched service address")
}
}

// Set up IPFS
ipfsApi, err := ipfs.StartIpfs(ctx, ipfsPath)
if err != nil {
Expand All @@ -531,12 +558,6 @@ func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMoni
js := eventservices.NewJobService(em, n)
n.EthServices["JobService"] = js

// Start services
err = n.StartEthServices()
if err != nil {
return err
}

// Restart jobs as necessary
err = js.RestartTranscoder()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/livepeer_cli/livepeer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (w *wizard) initializeOptions() []wizardOpt {
{desc: "Invoke \"initialize round\"", invoke: w.initializeRound},
{desc: "Invoke \"bond\"", invoke: w.bond},
{desc: "Invoke \"unbond\"", invoke: w.unbond},
{desc: "Invoke \"rebond\"", invoke: w.rebond},
{desc: "Invoke \"withdraw stake\" (LPT)", invoke: w.withdrawStake},
{desc: "Invoke \"withdraw fees\" (ETH)", invoke: w.withdrawFees},
{desc: "Invoke \"claim\" (for rewards and fees)", invoke: w.claimRewardsAndFees},
Expand Down
19 changes: 19 additions & 0 deletions cmd/livepeer_cli/wizard.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ func (w *wizard) readString() string {
}
}

// readStringAndValidate reads a single line from stdin, trims spaces and
// checks that the string passes a condition defined by the provided validation function
func (w *wizard) readStringAndValidate(validate func(in string) (string, error)) string {
for {
fmt.Printf("> ")
text, err := w.in.ReadString('\n')
if err != nil {
log.Crit("Failed to read user input", "err", err)
}
text = strings.TrimSpace(text)
validText, err := validate(text)
if err != nil {
log.Error("Failed to validate input", "err", err)
continue
}
return validText
}
}

// readDefaultString reads a single line from stdin, trimming if from spaces. If
// an empty line is entered, the default value is returned.
func (w *wizard) readDefaultString(def string) string {
Expand Down
Loading

0 comments on commit 2e9b5da

Please sign in to comment.