Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-enable support for multiple streams. #618

Merged
merged 9 commits into from
Dec 19, 2018
Merged

Re-enable support for multiple streams. #618

merged 9 commits into from
Dec 19, 2018

Conversation

j0sh
Copy link
Collaborator

@j0sh j0sh commented Dec 11, 2018

What does this pull request do? Explain your changes. (required)
This enables broadcasters to receive multiple incoming RTMP streams.

Multistream Changes

RNG

Seed RNG when LivepeerNode is initialized, rather than each time RandomIdGenerator is called.
Makes tests more predictable (eg, we can reset the RNG to for fixed randomness such as testing ID collisions) and mitigates certain types of testing failures around concurrency.

Common Struct for Per-Connection State

We add the rtmpConnection struct in lieu of holding similar information in separate maps.

 type rtmpConnection struct {
        nonce  uint64
        stream stream.RTMPVideoStream
        pl     core.PlaylistManager
        sess   *BroadcastSession
 }
 
 type LivepeerServer struct {
  ...
        rtmpConnections map[core.ManifestID]*rtmpConnection
        connectionLock  *sync.Mutex
  ...
 }

RTMP-HLS ManifestID Sharing

Refresher: a StreamID is composed of

<ManifestID><Rendition>

Previously, RTMP StreamIDs were independent from HLS StreamIDs. This offered protection from hijacking since RTMP URIs were uncorrelated to HLS playback URLs.

To support multi-stream, we incorporate the HLS playlist into the rtmpConnection struct alongside other elements of the RTMP connection.

However, to look up the playlist at playback time, the rtmpConnection needs to be accessible using the HLS StreamID alone, where a playlist is accessed via the HTTP path

/stream/<HLS_StreamID>.m3u8

We solve this by sharing the ManifestID between RTMP and HLS.

However, by sharing the ManifestID for HLS and RTMP, we open ourselves up to stream hijacking due to the easily guessable RTMP StreamID structure, which uses "RTMP" as the rendition:

<RTMP_ManifestID>RTMP

Stream Keys

To mitigate RTMP hijacking based on a known ManifestID, we replace the fixed "RTMP" rendition with a random stream key. This preserves the StreamID construct while restoring most of the security properties of the earlier setup.

<ManifestID><RTMP_StreamKey>

Authorizing Incoming RTMP

Note that stream key only protects playback (eg, via the segmenter); it doesn't prevent unauthorized streams from being ingested.

The current thinking for accomplishing this is to have a callback to some user-specified API. The API can then validate the RTMP path. If validation passes, the path can be used as the Manifest ID.

For example, streaming into the RTMP URL

rtmp://localhost/abc/def

Would generate a ManifestID of abc and a stream key of def for a StreamID of abcdef .

Later on we can discuss and experiment with variations on this, such as making the stream key optional during authorization, and generating it if omitted from the path. And so forth...

Alternative Introduce a layer of indirection rather than sharing ManifestIDs between RTMP and HLS. Maintain separate HLS ManifestID map for playlists, and RTMP connecton struct holds the HLS ManifestID. Requires more bookeeping for this separate map, but allows us to keep the ManifestIDs for RTMP and HLS strictly separate.

How did you test each of these updates (required)

  • Unit test coverage improved around the area of changes.
  • Manual testing. Start up a broadcast node and stream into it from several ffmpeg instances. Ensure segmentation succeeds, RTMP playback succeeds, and that the transcoding flow still works for all streams.
  • Test various CLI endpoints, including /status and /debug to ensure the expected (same) data is returned. The output isn't pretty but eyeballing it is enough for now.
  • The livepeer_cli doesn't quite work because it isn't tuned for offchain mode yet and I don't have a working onchain mode here yet. But should be OK since the webserver changes are minimal.

Does this pull request close any open issues?

Checklist:

  • README and other documentation updated
  • Node runs in OSX and devenv

@j0sh
Copy link
Collaborator Author

j0sh commented Dec 14, 2018

Ready for review.

@j0sh j0sh changed the title WIP: Re-enable support for multiple streams. Re-enable support for multiple streams. Dec 14, 2018

ExposeCurrentManifest bool

rtmpStreams map[core.StreamID]stream.RTMPVideoStream
broadcastRtmpToManifestMap map[string]string
rtmpConnections map[core.ManifestID]*rtmpConnection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a sync.Map here instead of a normal Map + Mutex?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hadn't known about sync.Map -- thanks for the tip!

Looking into it, a sync.Map would probably work here, since we don't really need to assure mutual exclusion outside reading/writing the map itself. However, the documentation seems to discourage its use:

Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content. ... The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys

In particular, we don't fit the first case of an "always-growing cache". We sort of fit the second, but we'd still have concurrent access to the same set of keys (eg, playback).

Do you still feel that the sync.Map would be appropriate? I'm fairly indifferent; it would certainly help reduce manual locking (and the potential for mistakes associated with that), but I'm unsure if the caveats should be a concern. In particular, the lack of type safety makes me twitch a bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you mention playback requiring concurrent access to the same set of keys are you referring to the access in getHLSMasterPlaylistHandler()?

If we have concurrent access to the same set of keys as a result of the above, then I think it makes sense not to stick with a plain Go map + mutex. I originally suggested using sync.Map because I thought rtmpConnections fell under category 2 usage from the documentation. But if not, in the interest of keeping type safety and only using sync.Map when usage falls under one of those 2 categories I think sticking with a plain Go map + mutex makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there actually are a number of places that could read the same keys, although getHLSMasterPlaylistHandler and getHLSMediaPlaylistHandler are the major ones. Based on this conversation, I went ahead and changed those places to use a read lock (which is more correct in our case), here's the result (as diff'd from the streamid refactor branch):

diff --git a/server/mediaserver.go b/server/mediaserver.go
index 04a67632..155dd9ba 100644
--- a/server/mediaserver.go
+++ b/server/mediaserver.go
@@ -75,7 +75,7 @@ type LivepeerServer struct {
 	ExposeCurrentManifest bool
 
 	rtmpConnections map[core.ManifestID]*rtmpConnection
-	connectionLock  *sync.Mutex
+	connectionLock  *sync.RWMutex
 }
 
 func NewLivepeerServer(rtmpAddr string, httpAddr string, lpNode *core.LivepeerNode) *LivepeerServer {
@@ -91,7 +91,7 @@ func NewLivepeerServer(rtmpAddr string, httpAddr string, lpNode *core.LivepeerNo
 		opts.HttpMux = http.NewServeMux()
 	}
 	server := lpmscore.New(&opts)
-	return &LivepeerServer{RTMPSegmenter: server, LPMS: server, LivepeerNode: lpNode, HttpMux: opts.HttpMux, connectionLock: &sync.Mutex{}, rtmpConnections: make(map[core.ManifestID]*rtmpConnection)}
+	return &LivepeerServer{RTMPSegmenter: server, LPMS: server, LivepeerNode: lpNode, HttpMux: opts.HttpMux, connectionLock: &sync.RWMutex{}, rtmpConnections: make(map[core.ManifestID]*rtmpConnection)}
 }
 
 //StartServer starts the LPMS server
@@ -517,7 +517,7 @@ func getHLSMasterPlaylistHandler(s *LivepeerServer) func(url *url.URL) (*m3u8.Ma
 			manifestID = sid.ManifestID
 		}
 
-		s.connectionLock.Lock()
+		s.connectionLock.RLock()
 		defer s.connectionLock.Unlock()
 		cxn, ok := s.rtmpConnections[manifestID]
 		if !ok || cxn.pl == nil {
@@ -536,7 +536,7 @@ func getHLSMediaPlaylistHandler(s *LivepeerServer) func(url *url.URL) (*m3u8.Med
 	return func(url *url.URL) (*m3u8.MediaPlaylist, error) {
 		strmID := parseStreamID(url.Path)
 		mid := strmID.ManifestID
-		s.connectionLock.Lock()
+		s.connectionLock.RLock()
 		defer s.connectionLock.Unlock()
 		cxn, ok := s.rtmpConnections[mid]
 		if !ok || cxn.pl == nil {
@@ -589,7 +589,7 @@ func getHLSSegmentHandler(s *LivepeerServer) func(url *url.URL) ([]byte, error)
 func getRTMPStreamHandler(s *LivepeerServer) func(url *url.URL) (stream.RTMPVideoStream, error) {
 	return func(url *url.URL) (stream.RTMPVideoStream, error) {
 		mid := parseManifestID(url.Path)
-		s.connectionLock.Lock()
+		s.connectionLock.RLock()
 		cxn, ok := s.rtmpConnections[mid]
 		s.connectionLock.Unlock()
 		if !ok {
@@ -627,7 +627,7 @@ func (s *LivepeerServer) GetNodeStatus() *net.NodeStatus {
 	// not threadsafe; need to deep copy the playlist
 	m := make(map[string]*m3u8.MasterPlaylist, 0)
 
-	s.connectionLock.Lock()
+	s.connectionLock.RLock()
 	defer s.connectionLock.Unlock()
 	for _, cxn := range s.rtmpConnections {
 		if cxn.sess == nil || cxn.pl == nil {
@@ -641,7 +641,7 @@ func (s *LivepeerServer) GetNodeStatus() *net.NodeStatus {
 
 // Debug helpers
 func (s *LivepeerServer) LatestPlaylist() core.PlaylistManager {
-	s.connectionLock.Lock()
+	s.connectionLock.RLock()
 	defer s.connectionLock.Unlock()
 	cxn, ok := s.rtmpConnections[LastManifestID]
 	if !ok || cxn.pl == nil {


const HLSWaitInterval = time.Second
const HLSBufferCap = uint(43200) //12 hrs assuming 1s segment
const HLSBufferWindow = uint(5)
const StreamKeyBytes = 6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity is there a particular reason for 6 bytes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason. Should it be 4? 5? 8? 32?

The stream key only makes it harder for an attacker to read the incoming RTMP. A typical node really should have monitoring for repeated attempts in case someone tries to brute force things, eg fail2ban. However, at the moment, we don't really have the logging granularity to facilitate this monitoring, nor do we expose enough information to the goclient to determine data like incoming RTMP IP addresses. We should though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it - seems fine as long as we have a constant for it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't using 8 bytes result in less lines of code because we can just use rand.Uint64()?

Copy link
Collaborator Author

@j0sh j0sh Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd still want to use our RNG helpers to generate the stream key. It seems that something else also mutates the RNG state during testing, so resetting the seed alone is not the best way to ensure deterministic results during testing.

Also see 735482a

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the built-in helpers from the rand package (i.e. rand.Uint64()) be relying on the same random seed (since they rely on the same default source) as RandomIdGenerator (https://github.com/livepeer/go-livepeer/blob/master/core/streamdata.go#L49) since this function is using rand.Uint32() under the hood?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, and we can't rely on resetting the seed to give us consistent behavior during unit tests. Hence we want to generate the stream key via the monkey-patchable RandomIdGenerator that's introduced above, to get deterministic values for testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - I initially missed your point about monkey patching RandomIdGenerator. 👍

s.connectionLock.Lock()
defer s.connectionLock.Unlock()
cxn, ok := s.rtmpConnections[mid]
if !ok || cxn.pl == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there cases where we could have a rtmpConnection without a core.PlaylistManager? Or is this just for extra safety?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is just for safety -- at the moment, there shouldn't be any cases where the PlaylistManager is empty.

@j0sh j0sh force-pushed the ja/multistream branch 2 times, most recently from f33a53e to 3078504 Compare December 18, 2018 18:01
storage := drivers.NodeStorage.NewSession(string(mid))
pl := core.NewBasicPlaylistManager(mid, storage)
if _, err := s.startBroadcast(pl); err != ErrDiscovery {
glog.Error("Expected error with discovery")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing glog.Error instead of t.Errof in TestStartBroadcast?

Copy link
Collaborator Author

@j0sh j0sh Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo! Good catch, fixed.

Note that we use glog.Error in a few other places such as SegmentRTMPToHLS which is right above this function. https://github.com/livepeer/go-livepeer/blob/master/server/mediaserver_test.go#L37

Copy link
Member

@ericxtang ericxtang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. 🚢 after @yondonfu is good with the PR too.

glog.Error("Error constructing manifest ID", err)
return ""
}
s.connectionLock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like we're writing here - should this be s.connectionLock.RLock()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, fixed

s.VideoNonceLock.Unlock()
s.connectionLock.Lock()
cxn, active := s.rtmpConnections[mid]
s.connectionLock.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we potentially modify the pointer cxn.sess on L490, should this unlock statement be after the active check (maybe using defer)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's probably safer. Fixed.

Right now, we don't really have a strong requirement for mutual exclusion of the cxn object (eg, cxn is currently unprotected elsewhere in gotRTMPStreamHandler) but using defer makes things more consistent with the rest of the code and gives us that extra bit of safety should it every come up later.

Avoids the need to maintain separate maps tracking various things.
This allows for multi stream support.
This avoids collisions when creating concurrent StreamIDs.
Copy link
Member

@yondonfu yondonfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


ExposeCurrentManifest bool

rtmpStreams map[core.StreamID]stream.RTMPVideoStream
broadcastRtmpToManifestMap map[string]string
rtmpConnections map[core.ManifestID]*rtmpConnection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@j0sh I'm not sure I understand why rtmpConnections is part of the LivepeerServer struct which is shared by both B and O roles. What's the best way for me to better understand why Os need to hold variables only useful for B?

thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O doesn't use most of the fields in LivepeerServer -- in fact, the RTMP listener isn't even started for O.

What you're seeing is a reflection of how things have evolved up to this point, same as how we have a single LivepeerNode with fields that are only used by one or the other. We can clean this up at some point.

@ericxtang ericxtang merged commit 663db3e into master Dec 19, 2018
@angyangie angyangie mentioned this pull request Jan 2, 2019
3 tasks
@yondonfu yondonfu mentioned this pull request Jan 3, 2019
3 tasks
@j0sh j0sh mentioned this pull request Jan 12, 2019
6 tasks
@j0sh j0sh deleted the ja/multistream branch December 19, 2019 21:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants