-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-enable support for multiple streams. #618
Conversation
Ready for review. |
|
||
ExposeCurrentManifest bool | ||
|
||
rtmpStreams map[core.StreamID]stream.RTMPVideoStream | ||
broadcastRtmpToManifestMap map[string]string | ||
rtmpConnections map[core.ManifestID]*rtmpConnection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use a sync.Map here instead of a normal Map + Mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hadn't known about sync.Map -- thanks for the tip!
Looking into it, a sync.Map
would probably work here, since we don't really need to assure mutual exclusion outside reading/writing the map itself. However, the documentation seems to discourage its use:
Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content. ... The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys
In particular, we don't fit the first case of an "always-growing cache". We sort of fit the second, but we'd still have concurrent access to the same set of keys (eg, playback).
Do you still feel that the sync.Map would be appropriate? I'm fairly indifferent; it would certainly help reduce manual locking (and the potential for mistakes associated with that), but I'm unsure if the caveats should be a concern. In particular, the lack of type safety makes me twitch a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you mention playback requiring concurrent access to the same set of keys are you referring to the access in getHLSMasterPlaylistHandler()
?
If we have concurrent access to the same set of keys as a result of the above, then I think it makes sense not to stick with a plain Go map + mutex. I originally suggested using sync.Map
because I thought rtmpConnections
fell under category 2 usage from the documentation. But if not, in the interest of keeping type safety and only using sync.Map
when usage falls under one of those 2 categories I think sticking with a plain Go map + mutex makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there actually are a number of places that could read the same keys, although getHLSMasterPlaylistHandler
and getHLSMediaPlaylistHandler
are the major ones. Based on this conversation, I went ahead and changed those places to use a read lock (which is more correct in our case), here's the result (as diff'd from the streamid refactor branch):
diff --git a/server/mediaserver.go b/server/mediaserver.go
index 04a67632..155dd9ba 100644
--- a/server/mediaserver.go
+++ b/server/mediaserver.go
@@ -75,7 +75,7 @@ type LivepeerServer struct {
ExposeCurrentManifest bool
rtmpConnections map[core.ManifestID]*rtmpConnection
- connectionLock *sync.Mutex
+ connectionLock *sync.RWMutex
}
func NewLivepeerServer(rtmpAddr string, httpAddr string, lpNode *core.LivepeerNode) *LivepeerServer {
@@ -91,7 +91,7 @@ func NewLivepeerServer(rtmpAddr string, httpAddr string, lpNode *core.LivepeerNo
opts.HttpMux = http.NewServeMux()
}
server := lpmscore.New(&opts)
- return &LivepeerServer{RTMPSegmenter: server, LPMS: server, LivepeerNode: lpNode, HttpMux: opts.HttpMux, connectionLock: &sync.Mutex{}, rtmpConnections: make(map[core.ManifestID]*rtmpConnection)}
+ return &LivepeerServer{RTMPSegmenter: server, LPMS: server, LivepeerNode: lpNode, HttpMux: opts.HttpMux, connectionLock: &sync.RWMutex{}, rtmpConnections: make(map[core.ManifestID]*rtmpConnection)}
}
//StartServer starts the LPMS server
@@ -517,7 +517,7 @@ func getHLSMasterPlaylistHandler(s *LivepeerServer) func(url *url.URL) (*m3u8.Ma
manifestID = sid.ManifestID
}
- s.connectionLock.Lock()
+ s.connectionLock.RLock()
defer s.connectionLock.Unlock()
cxn, ok := s.rtmpConnections[manifestID]
if !ok || cxn.pl == nil {
@@ -536,7 +536,7 @@ func getHLSMediaPlaylistHandler(s *LivepeerServer) func(url *url.URL) (*m3u8.Med
return func(url *url.URL) (*m3u8.MediaPlaylist, error) {
strmID := parseStreamID(url.Path)
mid := strmID.ManifestID
- s.connectionLock.Lock()
+ s.connectionLock.RLock()
defer s.connectionLock.Unlock()
cxn, ok := s.rtmpConnections[mid]
if !ok || cxn.pl == nil {
@@ -589,7 +589,7 @@ func getHLSSegmentHandler(s *LivepeerServer) func(url *url.URL) ([]byte, error)
func getRTMPStreamHandler(s *LivepeerServer) func(url *url.URL) (stream.RTMPVideoStream, error) {
return func(url *url.URL) (stream.RTMPVideoStream, error) {
mid := parseManifestID(url.Path)
- s.connectionLock.Lock()
+ s.connectionLock.RLock()
cxn, ok := s.rtmpConnections[mid]
s.connectionLock.Unlock()
if !ok {
@@ -627,7 +627,7 @@ func (s *LivepeerServer) GetNodeStatus() *net.NodeStatus {
// not threadsafe; need to deep copy the playlist
m := make(map[string]*m3u8.MasterPlaylist, 0)
- s.connectionLock.Lock()
+ s.connectionLock.RLock()
defer s.connectionLock.Unlock()
for _, cxn := range s.rtmpConnections {
if cxn.sess == nil || cxn.pl == nil {
@@ -641,7 +641,7 @@ func (s *LivepeerServer) GetNodeStatus() *net.NodeStatus {
// Debug helpers
func (s *LivepeerServer) LatestPlaylist() core.PlaylistManager {
- s.connectionLock.Lock()
+ s.connectionLock.RLock()
defer s.connectionLock.Unlock()
cxn, ok := s.rtmpConnections[LastManifestID]
if !ok || cxn.pl == nil {
|
||
const HLSWaitInterval = time.Second | ||
const HLSBufferCap = uint(43200) //12 hrs assuming 1s segment | ||
const HLSBufferWindow = uint(5) | ||
const StreamKeyBytes = 6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity is there a particular reason for 6 bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason. Should it be 4? 5? 8? 32?
The stream key only makes it harder for an attacker to read the incoming RTMP. A typical node really should have monitoring for repeated attempts in case someone tries to brute force things, eg fail2ban. However, at the moment, we don't really have the logging granularity to facilitate this monitoring, nor do we expose enough information to the goclient to determine data like incoming RTMP IP addresses. We should though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it - seems fine as long as we have a constant for it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't using 8 bytes result in less lines of code because we can just use rand.Uint64()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd still want to use our RNG helpers to generate the stream key. It seems that something else also mutates the RNG state during testing, so resetting the seed alone is not the best way to ensure deterministic results during testing.
Also see 735482a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't the built-in helpers from the rand
package (i.e. rand.Uint64()
) be relying on the same random seed (since they rely on the same default source) as RandomIdGenerator
(https://github.com/livepeer/go-livepeer/blob/master/core/streamdata.go#L49) since this function is using rand.Uint32()
under the hood?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, and we can't rely on resetting the seed to give us consistent behavior during unit tests. Hence we want to generate the stream key via the monkey-patchable RandomIdGenerator
that's introduced above, to get deterministic values for testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see - I initially missed your point about monkey patching RandomIdGenerator
. 👍
s.connectionLock.Lock() | ||
defer s.connectionLock.Unlock() | ||
cxn, ok := s.rtmpConnections[mid] | ||
if !ok || cxn.pl == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there cases where we could have a rtmpConnection
without a core.PlaylistManager
? Or is this just for extra safety?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is just for safety -- at the moment, there shouldn't be any cases where the PlaylistManager is empty.
f33a53e
to
3078504
Compare
server/mediaserver_test.go
Outdated
storage := drivers.NodeStorage.NewSession(string(mid)) | ||
pl := core.NewBasicPlaylistManager(mid, storage) | ||
if _, err := s.startBroadcast(pl); err != ErrDiscovery { | ||
glog.Error("Expected error with discovery") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing glog.Error
instead of t.Errof
in TestStartBroadcast
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo! Good catch, fixed.
Note that we use glog.Error
in a few other places such as SegmentRTMPToHLS
which is right above this function. https://github.com/livepeer/go-livepeer/blob/master/server/mediaserver_test.go#L37
Needed to clean up playlists after a RTMP disconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. 🚢 after @yondonfu is good with the PR too.
server/mediaserver.go
Outdated
glog.Error("Error constructing manifest ID", err) | ||
return "" | ||
} | ||
s.connectionLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like we're writing here - should this be s.connectionLock.RLock()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, fixed
server/mediaserver.go
Outdated
s.VideoNonceLock.Unlock() | ||
s.connectionLock.Lock() | ||
cxn, active := s.rtmpConnections[mid] | ||
s.connectionLock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we potentially modify the pointer cxn.sess
on L490, should this unlock statement be after the active
check (maybe using defer
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's probably safer. Fixed.
Right now, we don't really have a strong requirement for mutual exclusion of the cxn
object (eg, cxn
is currently unprotected elsewhere in gotRTMPStreamHandler
) but using defer
makes things more consistent with the rest of the code and gives us that extra bit of safety should it every come up later.
Avoids the need to maintain separate maps tracking various things.
This allows for multi stream support.
This avoids collisions when creating concurrent StreamIDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
ExposeCurrentManifest bool | ||
|
||
rtmpStreams map[core.StreamID]stream.RTMPVideoStream | ||
broadcastRtmpToManifestMap map[string]string | ||
rtmpConnections map[core.ManifestID]*rtmpConnection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@j0sh I'm not sure I understand why rtmpConnections
is part of the LivepeerServer
struct which is shared by both B and O roles. What's the best way for me to better understand why Os need to hold variables only useful for B?
thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
O doesn't use most of the fields in LivepeerServer
-- in fact, the RTMP listener isn't even started for O.
What you're seeing is a reflection of how things have evolved up to this point, same as how we have a single LivepeerNode
with fields that are only used by one or the other. We can clean this up at some point.
What does this pull request do? Explain your changes. (required)
This enables broadcasters to receive multiple incoming RTMP streams.
Multistream Changes
RNG
Seed RNG when LivepeerNode is initialized, rather than each time
RandomIdGenerator
is called.Makes tests more predictable (eg, we can reset the RNG to for fixed randomness such as testing ID collisions) and mitigates certain types of testing failures around concurrency.
Common Struct for Per-Connection State
We add the
rtmpConnection
struct in lieu of holding similar information in separate maps.RTMP-HLS ManifestID Sharing
Refresher: a StreamID is composed of
Previously, RTMP StreamIDs were independent from HLS StreamIDs. This offered protection from hijacking since RTMP URIs were uncorrelated to HLS playback URLs.
To support multi-stream, we incorporate the HLS playlist into the
rtmpConnection
struct alongside other elements of the RTMP connection.However, to look up the playlist at playback time, the
rtmpConnection
needs to be accessible using the HLS StreamID alone, where a playlist is accessed via the HTTP pathWe solve this by sharing the ManifestID between RTMP and HLS.
However, by sharing the ManifestID for HLS and RTMP, we open ourselves up to stream hijacking due to the easily guessable RTMP StreamID structure, which uses "RTMP" as the rendition:
Stream Keys
To mitigate RTMP hijacking based on a known ManifestID, we replace the fixed "RTMP" rendition with a random stream key. This preserves the StreamID construct while restoring most of the security properties of the earlier setup.
Authorizing Incoming RTMP
Note that stream key only protects playback (eg, via the segmenter); it doesn't prevent unauthorized streams from being ingested.
The current thinking for accomplishing this is to have a callback to some user-specified API. The API can then validate the RTMP path. If validation passes, the path can be used as the Manifest ID.
For example, streaming into the RTMP URL
Would generate a ManifestID of
abc
and a stream key ofdef
for a StreamID ofabcdef
.Later on we can discuss and experiment with variations on this, such as making the stream key optional during authorization, and generating it if omitted from the path. And so forth...
Alternative Introduce a layer of indirection rather than sharing ManifestIDs between RTMP and HLS. Maintain separate HLS ManifestID map for playlists, and RTMP connecton struct holds the HLS ManifestID. Requires more bookeeping for this separate map, but allows us to keep the ManifestIDs for RTMP and HLS strictly separate.
How did you test each of these updates (required)
/status
and/debug
to ensure the expected (same) data is returned. The output isn't pretty but eyeballing it is enough for now.livepeer_cli
doesn't quite work because it isn't tuned for offchain mode yet and I don't have a working onchain mode here yet. But should be OK since the webserver changes are minimal.Does this pull request close any open issues?
Checklist:
README and other documentation updated