Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Add suspending sessions that did not pass the pHash verification #2103

Merged
merged 7 commits into from
Nov 22, 2021
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- \#2085 Set max refresh sessions threshold to 8 (@yondonfu)
- \#2083 Return 422 to the push client after max retry attempts for a segment (@jailuthra)
- \#2022 Randomize selection of orchestrators in untrusted pool at a random frequency (@yondonfu)
- \#2103 Suspend sessions that did not pass p-hash verification (@leszko)

#### Orchestrator

Expand Down
13 changes: 11 additions & 2 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ func (bsm *BroadcastSessionsManager) chooseResults(submitResultsCh chan *SubmitR
trustedResults.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl, err)
return nil, nil, err
}

var sessionsToSuspend []*BroadcastSession
for _, untrustedResult := range untrustedResults {
untrustedHash, err := drivers.GetSegmentData(untrustedResult.TranscodeResult.Segments[segmToCheckIndex].PerceptualHashUrl)
if err != nil {
Expand All @@ -542,9 +544,16 @@ func (bsm *BroadcastSessionsManager) chooseResults(submitResultsCh chan *SubmitR
if untrustedResult.Err == nil {
bsm.sessionVerified(untrustedResult.Session)
}
// suspend sessions which returned incorrect results
for _, s := range sessionsToSuspend {
bsm.suspendAndRemoveOrch(s)
}
return untrustedResult.Session, untrustedResult.TranscodeResult, untrustedResult.Err
} else if monitor.Enabled {
monitor.FastVerificationFailed()
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this exact issue or very high priority, so maybe we should open a new issue/PR for this:

Maybe we should reset our verifiedSession to nil if the hash it sends doesn't match?

Suggested change
} else {
} else {
if (bsm.verifiedSession == untrustedResult.Session) {
bsm.verifiedSession = nil
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see one issue with adding this condition. If we add it, then a false-negative will cause bsm.verifiedSession = nil and in a result, we'll start looking for a new session. That can in turn cause bouncing between sessions if we encounter many false-negative scenarios.

We decided that we'll suspend sessions only if it failed p-hash verification but at the same time the other untrusted sessions passed the verification. We could do the same and maybe set verifiedSession to nil when we suspend the session, but technically it does not matter, because this condition takes into consideration both bsm.verifiedSession = nil and excluding suspended sessions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about false negatives causing swaps, and yeah we'd anyway suspend a verified session and look for a new one if the other untrusted session passes. LGTM feel free to merge 👍

sessionsToSuspend = append(sessionsToSuspend, untrustedResult.Session)
if monitor.Enabled {
monitor.FastVerificationFailed()
}
}
}

Expand Down
83 changes: 81 additions & 2 deletions server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,7 @@ func TestPush_ReuseIntmidWithDiffExtmid(t *testing.T) {
assert.False(extEx2)
serverCleanup(s)
}

func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -1590,7 +1591,31 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("UNtrusted transcoded binary data"))
})
unverifiedHash := goodHash
unverifiedHashCalled := 0
mux2.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(unverifiedHash)
unverifiedHashCalled++
})

ts3, mux3 := stubTLSServer()
defer ts3.Close()
tSegData3 := []*net.TranscodedSegmentData{{Url: ts3.URL + segPath, Pixels: 100, PerceptualHashUrl: ts3.URL + segPath + ".phash"}}
tr3 := dummyRes(tSegData3)
buf3, err := proto.Marshal(tr3)
require.Nil(t, err)
mux3.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) {
// delay so it will be chosen second
time.Sleep(50 * time.Millisecond)
w.WriteHeader(http.StatusOK)
w.Write(buf3)
})
mux3.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("second UNtrusted transcoded binary data"))
})
mux3.HandleFunc(segPath+".phash", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(goodHash)
})
Expand All @@ -1604,14 +1629,24 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
sess2.Params.ManifestID = "mani"
sess2.OrchestratorScore = common.Score_Untrusted

bsm := bsmWithSessListExt([]*BroadcastSession{sess1}, []*BroadcastSession{sess2}, false)
sess3 := StubBroadcastSession(ts3.URL)
glog.Errorf("====> ts 3 url %s ", ts3.URL)
yondonfu marked this conversation as resolved.
Show resolved Hide resolved
sess3.Params.Profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
sess3.Params.ManifestID = "mani"
sess3.OrchestratorScore = common.Score_Untrusted

bsm := bsmWithSessListExt([]*BroadcastSession{sess1}, []*BroadcastSession{sess3, sess2}, false)
bsm.VerificationFreq = 1
assert.Equal(0, bsm.untrustedPool.sus.count)
// hack: stop pool from refreshing
bsm.untrustedPool.refreshing = true

url, _ := url.ParseRequestURI("test://some.host")
osd := drivers.NewMemoryDriver(url)
osSession := osd.NewSession("testPath")
sess1.BroadcasterOS = osSession
sess2.BroadcasterOS = osSession
sess3.BroadcasterOS = osSession

oldjpqt := core.JsonPlaylistQuitTimeout
defer func() {
Expand Down Expand Up @@ -1652,7 +1687,7 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert.Nil(err)
assert.Contains(params, "name")
assert.Len(params, 1)
assert.Equal(params["name"], "P144p25fps16x9_17.ts")
assert.Equal("P144p25fps16x9_17.ts", params["name"])
assert.Equal(`attachment; filename="P144p25fps16x9_17.ts"`, p.Header.Get("Content-Disposition"))
assert.Equal("P144p25fps16x9", p.Header.Get("Rendition-Name"))
bodyPart, err := ioutil.ReadAll(p)
Expand All @@ -1665,4 +1700,48 @@ func TestPush_MultipartReturnMultiSession(t *testing.T) {
assert.Equal(1, i)
assert.Equal(uint64(12), cxn.sourceBytes)
assert.Equal(uint64(32), cxn.transcodedBytes)

// now make unverified to respond with bash hash
leszko marked this conversation as resolved.
Show resolved Hide resolved
unverifiedHash = []byte{0}
reader = strings.NewReader("InsteadOf.TS")
w = httptest.NewRecorder()
req = httptest.NewRequest("POST", "/live/mani/18.ts", reader)

req.Header.Set("Accept", "multipart/mixed")
s.HandlePush(w, req)
resp = w.Result()
defer resp.Body.Close()
assert.Equal(200, resp.StatusCode)

mediaType, params, err = mime.ParseMediaType(resp.Header.Get("Content-Type"))
assert.Equal("multipart/mixed", mediaType)
assert.Nil(err)
mr = multipart.NewReader(resp.Body, params["boundary"])
i = 0
for {
p, err := mr.NextPart()
if err == io.EOF {
break
}
assert.NoError(err)
mediaType, params, err := mime.ParseMediaType(p.Header.Get("Content-Type"))
assert.Nil(err)
assert.Contains(params, "name")
assert.Len(params, 1)
assert.Equal("P144p25fps16x9_18.ts", params["name"])
assert.Equal(`attachment; filename="P144p25fps16x9_18.ts"`, p.Header.Get("Content-Disposition"))
assert.Equal("P144p25fps16x9", p.Header.Get("Rendition-Name"))
bodyPart, err := ioutil.ReadAll(p)
assert.NoError(err)
assert.Equal("video/mp2t", strings.ToLower(mediaType))
assert.Equal("second UNtrusted transcoded binary data", string(bodyPart))

i++
}
assert.Equal(1, i)
assert.Equal(uint64(12*2), cxn.sourceBytes)
assert.Equal(uint64(71), cxn.transcodedBytes)
assert.Equal(2, unverifiedHashCalled)
assert.Contains(bsm.untrustedPool.sus.list, ts2.URL)
assert.Equal(0, bsm.untrustedPool.sus.count)
}