From 0bb05b2b76a9ea49bc16ab85c04d76eadd364bda Mon Sep 17 00:00:00 2001 From: Ivan Tivonenko Date: Tue, 26 Feb 2019 14:57:14 +0200 Subject: [PATCH] Webhook URL option - allows authenticate incoming streams through call to webhook --- cmd/livepeer/livepeer.go | 23 ++++++++++++++- server/mediaserver.go | 58 ++++++++++++++++++++++++++++++++++---- server/mediaserver_test.go | 54 +++++++++++++++++++++++++++++++++++ test_args.sh | 10 +++++++ 4 files changed, 138 insertions(+), 7 deletions(-) diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index 75a5ed3377..d0985384a2 100644 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -106,6 +106,9 @@ func main() { gsBucket := flag.String("gsbucket", "", "Google storage bucket") gsKey := flag.String("gskey", "", "Google Storage private key file name (in json format)") + // API + authWebhookURL := flag.String("authWebhookUrl", "", "RTMP authentication webhook URL") + flag.Parse() vFlag.Value.Set(*verbosity) @@ -383,7 +386,10 @@ func main() { // Not a fatal error; may continue operating in segment-only mode glog.Error("No orchestrator specified; transcoding will not happen") } - + var err error + if server.AuthWebhookURL, err = getAuthWebhookURL(*authWebhookURL); err != nil { + glog.Fatal("Error setting auth webhook URL ", err) + } } else if n.NodeType == core.OrchestratorNode { suri, err := getServiceURI(n, *serviceAddr) if err != nil { @@ -499,6 +505,21 @@ func main() { } } +func getAuthWebhookURL(u string) (string, error) { + if u == "" { + return "", nil + } + p, err := url.ParseRequestURI(u) + if err != nil { + return "", err + } + if p.Scheme != "http" && p.Scheme != "https" { + return "", errors.New("Webhook URL should be HTTP or HTTP") + } + glog.Infof("Using webhook url %s", u) + return u, nil +} + // ServiceURI checking steps: // If passed in via -serviceAddr: return that // Else: get inferred address. diff --git a/server/mediaserver.go b/server/mediaserver.go index 45fdbbbae7..68bd6de27c 100644 --- a/server/mediaserver.go +++ b/server/mediaserver.go @@ -6,8 +6,10 @@ package server import ( "context" "encoding/hex" + "encoding/json" "errors" "fmt" + "io/ioutil" "math/big" "math/rand" "net/http" @@ -53,6 +55,8 @@ const BroadcastRetry = 15 * time.Second var BroadcastPrice = big.NewInt(1) var BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ffmpeg.P240p30fps4x3, ffmpeg.P360p30fps16x9} +var AuthWebhookURL string + type rtmpConnection struct { mid core.ManifestID nonce uint64 @@ -85,6 +89,10 @@ type LivepeerServer struct { connectionLock *sync.RWMutex } +type authWebhookResponse struct { + ManifestID string `json:"manifestID"` +} + func NewLivepeerServer(rtmpAddr string, httpAddr string, lpNode *core.LivepeerNode) *LivepeerServer { opts := lpmscore.LPMSOpts{ RtmpAddr: rtmpAddr, RtmpDisabled: true, @@ -146,33 +154,71 @@ func (s *LivepeerServer) StartMediaServer(ctx context.Context, transcodingOption //RTMP Publish Handlers func createRTMPStreamIDHandler(s *LivepeerServer) func(url *url.URL) (strmID string) { return func(url *url.URL) (strmID string) { - //Create a ManifestID - //If manifestID is passed in, use that one + //Check webhook for ManifestID + //If ManifestID is returned from webhook, use it + //Else check URL for ManifestID + //If ManifestID is passed in URL, use that one //Else create one - mid := parseManifestID(url.Query().Get("manifestID")) + var mid core.ManifestID + var err error + if mid, err = authenticateStream(url.String()); err != nil { + glog.Error("Authentication denied for ", err) + return "" + } + + if mid == "" { + mid = parseManifestID(url.Query().Get("manifestID")) + } if mid == "" { mid = core.RandomManifestID() } // Ensure there's no concurrent StreamID with the same name s.connectionLock.RLock() + defer s.connectionLock.RUnlock() if core.MaxSessions > 0 && len(s.rtmpConnections) >= core.MaxSessions { glog.Error("Too many connections") - s.connectionLock.RUnlock() return "" } if _, exists := s.rtmpConnections[mid]; exists { glog.Error("Manifest already exists ", mid) - s.connectionLock.RUnlock() return "" } - s.connectionLock.RUnlock() // Generate RTMP part of StreamID key := hex.EncodeToString(core.RandomIdGenerator(StreamKeyBytes)) return core.MakeStreamIDFromString(string(mid), key).String() } +} +func authenticateStream(url string) (core.ManifestID, error) { + if AuthWebhookURL == "" { + return "", nil + } + payload := fmt.Sprintf(`{"url":"%s"}"`, url) + body := strings.NewReader(payload) + + resp, err := http.Post(AuthWebhookURL, "application/json", body) + if err != nil { + return "", err + } + rbody, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode != 200 { + return "", errors.New(resp.Status) + } + if len(rbody) == 0 { + return "", nil + } + var authResp authWebhookResponse + err = json.Unmarshal(rbody, &authResp) + if err != nil { + return "", err + } + if authResp.ManifestID == "" { + return "", errors.New("Empty manifest id not allowed") + } + return core.ManifestID(authResp.ManifestID), nil } func rtmpManifestID(rtmpStrm stream.RTMPVideoStream) core.ManifestID { diff --git a/server/mediaserver_test.go b/server/mediaserver_test.go index d36a26d1ff..69b8198bc6 100644 --- a/server/mediaserver_test.go +++ b/server/mediaserver_test.go @@ -8,6 +8,8 @@ import ( "fmt" "math/big" "math/rand" + "net/http" + "net/http/httptest" "net/url" "sync" "testing" @@ -201,6 +203,58 @@ func TestCreateRTMPStreamHandlerCap(t *testing.T) { core.MaxSessions = oldMaxSessions } +func TestCreateRTMPStreamHandlerWebhook(t *testing.T) { + s := setupServer() + s.RTMPSegmenter = &StubSegmenter{skip: true} + createSid := createRTMPStreamIDHandler(s) + + AuthWebhookURL = "http://localhost:8938/notexisting" + u, _ := url.Parse("http://hot/something/?manifestID=id1") + sid := createSid(u) + if sid != "" { + t.Error("Webhook auth failed") + } + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(nil) + })) + defer ts.Close() + AuthWebhookURL = ts.URL + sid = createSid(u) + if sid == "" { + t.Error("On empty response with 200 code should pass") + } + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"manifestID":""}`)) + })) + defer ts.Close() + AuthWebhookURL = ts2.URL + sid = createSid(u) + if sid != "" { + t.Error("Should not pass in returned manifest id is empty") + } + ts3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{manifestID:"XX"}`)) + })) + defer ts3.Close() + AuthWebhookURL = ts3.URL + sid = createSid(u) + if sid != "" { + t.Error("Should not pass if returned json is invalid") + } + ts4 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"manifestID":"xy"}`)) + })) + defer ts4.Close() + AuthWebhookURL = ts4.URL + sid = createSid(u) + mid := parseManifestID(sid) + if mid != "xy" { + t.Error("Should set manifest id to one provided by webhook") + } + AuthWebhookURL = "" +} + func TestCreateRTMPStreamHandler(t *testing.T) { // Monkey patch rng to avoid unpredictability even when seeding diff --git a/test_args.sh b/test_args.sh index 8ecce599ac..0edae51972 100755 --- a/test_args.sh +++ b/test_args.sh @@ -94,4 +94,14 @@ res=0 ./livepeer -transcoder || res=$? [ $res -ne 0 ] +# exit early if webhhok url is not http +res=0 +./livepeer -broadcaster -authWebhookUrl tcp://host/ || res=$? +[ $res -ne 0 ] + +# exit early if webhook url is not properly formatted +res=0 +./livepeer -broadcaster -authWebhookUrl http\\://host/ || res=$? +[ $res -ne 0 ] + rm -rf "$TMPDIR"