Skip to content

Commit

Permalink
WIP: Integrate broadcaster with http client.
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh committed Jun 7, 2018
1 parent 2dc00e6 commit 482db44
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
24 changes: 23 additions & 1 deletion server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func createRTMPStreamIDHandler(s *LivepeerServer) func(url *url.URL) (strmID str

func gotRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {
return func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {
var rpcBcast *broadcaster
if s.LivepeerNode.Eth != nil {
//Check if round is initialized
initialized, err := s.LivepeerNode.Eth.CurrentRoundInitialized()
Expand Down Expand Up @@ -255,6 +256,9 @@ func gotRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.
if ssb, err := core.SignedSegmentToBytes(core.SignedSegment{Seg: *seg, Sig: sig}); err != nil {
glog.Errorf("Error signing segment: %v", seg.SeqNo)
} else {
if rpcBcast != nil {
SubmitSegment(rpcBcast, string(hlsStrmID), seg)
}
if err := broadcaster.Broadcast(seg.SeqNo, ssb); err != nil {
glog.Errorf("Error broadcasting to network: %v", err)
}
Expand Down Expand Up @@ -310,7 +314,25 @@ func gotRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.

if s.LivepeerNode.Eth != nil {
//Create Transcode Job Onchain
go s.LivepeerNode.CreateTranscodeJob(hlsStrmID, BroadcastJobVideoProfiles, BroadcastPrice)
go func() {
job, err := s.LivepeerNode.CreateTranscodeJob(hlsStrmID, BroadcastJobVideoProfiles, BroadcastPrice)
if err != nil {
return // XXX feed back error?
}
tca, err := s.LivepeerNode.Eth.AssignedTranscoder(job.JobId)
if err != nil {
return // XXX feed back error?
}
serviceUri, err := s.LivepeerNode.Eth.GetServiceURI(tca)
if err != nil || serviceUri == "" {
glog.Error("Unable to retrieve Service URI ", err)
return // XXX feed back error?
}
rpcBcast, err = StartBroadcastClient(serviceUri, s.LivepeerNode)
if err != nil {
return // XXX feed back error?
}
}()
}
return nil
}
Expand Down
48 changes: 44 additions & 4 deletions server/rpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
Expand Down Expand Up @@ -77,10 +78,14 @@ func (o *orchestrator) GetTranscoder(context context.Context, req *TranscoderReq
}

type broadcaster struct {
node *core.LivepeerNode
node *core.LivepeerNode
httpc *http.Client
tinfo *TranscoderInfo
}
type Broadcaster interface {
Sign(string) ([]byte, error)
SetTranscoderInfo(*TranscoderInfo)
GetTranscoderInfo() *TranscoderInfo
}

func (bcast *broadcaster) Sign(msg string) ([]byte, error) {
Expand All @@ -89,6 +94,12 @@ func (bcast *broadcaster) Sign(msg string) ([]byte, error) {
}
return bcast.node.Eth.Sign(crypto.Keccak256([]byte(msg)))
}
func (bcast *broadcaster) GetTranscoderInfo() *TranscoderInfo {
return bcast.tinfo
}
func (bcast *broadcaster) SetTranscoderInfo(t *TranscoderInfo) {
bcast.tinfo = t
}

func genTranscoderReq(b Broadcaster, jid int64) (*TranscoderRequest, error) {
sig, err := b.Sign(fmt.Sprintf("%v", jid))
Expand Down Expand Up @@ -286,7 +297,7 @@ func StartTranscodeServer(bind string, node *core.LivepeerNode) {
http.ListenAndServeTLS(bind, cert, key, &lp)
}

func StartBroadcastClient(orchestratorServer string, node *core.LivepeerNode) (*http.Client, error) {
func StartBroadcastClient(orchestratorServer string, node *core.LivepeerNode) (*broadcaster, error) {
tlsConfig := &tls.Config{InsecureSkipVerify: true}
httpc := &http.Client{Transport: &http.Transport{TLSClientConfig: tlsConfig}}
conn, err := grpc.Dial(orchestratorServer,
Expand All @@ -299,13 +310,14 @@ func StartBroadcastClient(orchestratorServer string, node *core.LivepeerNode) (*
c := NewOrchestratorClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
b := broadcaster{node: node}
b := broadcaster{node: node, httpc: httpc}
req, err := genTranscoderReq(&b, 1234)
r, err := c.GetTranscoder(ctx, req)
if err != nil {
log.Fatalf("Could not get transcoder: %v", err)
return nil, err
}
b.tinfo = r
resp, err := httpc.Get(r.Transcoder + "/segment")
if err != nil {
log.Fatalf("Could not get segment response: %v", err)
Expand All @@ -318,7 +330,35 @@ func StartBroadcastClient(orchestratorServer string, node *core.LivepeerNode) (*
return nil, err
}
log.Println(string(data))
return httpc, nil
return &b, nil
}

func SubmitSegment(bcast Broadcaster, streamId string, seg *stream.HLSSegment) {
hc := http.Client{}
segData := &SegData{
Seq: int64(seg.SeqNo),
Hash: crypto.Keccak256(seg.Data),
}
segCreds, err := genSegCreds(bcast, streamId, segData)
if err != nil {
return
}
ti := bcast.GetTranscoderInfo()
req, err := http.NewRequest("POST", ti.Transcoder, bytes.NewBuffer(seg.Data))
if err != nil {
glog.Error("Could not generate trascode request to ", ti.Transcoder)
return
}

req.Header.Set("Authorization", ti.AuthType)
req.Header.Set("Credentials", ti.Credentials)
req.Header.Set("Livepeer-Segment", segCreds)
req.Header.Set("Content-Type", "media/livepeer-videoblob")

resp, err := hc.Do(req)
if err != nil {
glog.Error("Unable to submit segment ", err)
return
}
glog.Info("resp: %v", resp)
}
5 changes: 5 additions & 0 deletions server/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func StubOrchestrator() *stubOrchestrator {
return &stubOrchestrator{priv: pk}
}

func (r *stubOrchestrator) GetTranscoderInfo() *TranscoderInfo {
return nil
}
func (r *stubOrchestrator) SetTranscoderInfo(ti *TranscoderInfo) {
}
func StubBroadcaster2() *stubOrchestrator {
return StubOrchestrator() // lazy; leverage subtyping for interface commonalities
}
Expand Down

0 comments on commit 482db44

Please sign in to comment.