Skip to content

Commit

Permalink
Refine HTTP streaming error.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Sep 3, 2024
1 parent 27817e7 commit 746d432
Showing 1 changed file with 26 additions and 133 deletions.
159 changes: 26 additions & 133 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -159,8 +158,6 @@ func (v *httpServer) Run(ctx context.Context) error {
type HTTPStreaming struct {
// The context for HTTP streaming.
ctx context.Context
// Whether has written response to client.
written bool
}

func NewHTTPStreaming(opts ...func(streaming *HTTPStreaming)) *HTTPStreaming {
Expand All @@ -175,62 +172,30 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
ctx := logger.WithContext(v.ctx)

var backendClosedErr, clientClosedErr bool

handleBackendErr := func(err error) {
if isPeerClosedError(err) {
if !backendClosedErr {
backendClosedErr = true
logger.Df(ctx, "HTTP backend peer closed")
}
} else {
logger.Wf(ctx, "HTTP backend err %+v", err)
}
}

handleClientErr := func(err error) {
if isPeerClosedError(err) {
if !clientClosedErr {
clientClosedErr = true
logger.Df(ctx, "HTTP client peer closed")
}
} else {
logger.Wf(ctx, "HTTP client %v err %+v", r.RemoteAddr, err)
}
}

handleErr := func(err error) {
if perr, ok := err.(*RTMPProxyError); ok {
if perr.isBackend {
handleBackendErr(perr.err)
} else {
handleClientErr(perr.err)
}
} else {
handleClientErr(err)
}
}

if err := v.serve(ctx, w, r); err != nil {
if merr, ok := err.(*RTMPMultipleError); ok {
// If multiple errors, handle all of them.
for _, err := range merr.errs {
handleErr(err)
}
} else {
// If single error, directly handle it.
handleErr(err)
}

if !v.written {
apiError(ctx, w, r, err)
}
apiError(ctx, w, r, err)
} else {
logger.Df(ctx, "HTTP client done")
}
}

func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// Always support CORS. Note that browser may send origin header for m3u8, but no origin header
// for ts. So we always response CORS header.
if true {
// SRS does not need cookie or credentials, so we disable CORS credentials, and use * for CORS origin,
// headers, expose headers and methods.
w.Header().Set("Access-Control-Allow-Origin", "*")
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Headers
w.Header().Set("Access-Control-Allow-Headers", "*")
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Methods
w.Header().Set("Access-Control-Allow-Methods", "*")
}
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
return nil
}

// Build the stream URL in vhost/app/stream schema.
unifiedURL, fullURL := convertURLToStreamURL(r)
logger.Df(ctx, "Got HTTP client from %v for %v", r.RemoteAddr, fullURL)
Expand All @@ -247,8 +212,7 @@ func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *htt
}

if err = v.serveByBackend(ctx, w, r, backend, streamURL); err != nil {
extraMsg := fmt.Sprintf("serve %v by backend %+v", fullURL, backend)
return wrapProxyError(err, extraMsg)
return errors.Wrapf(err, "serve %v with %v by backend %+v", fullURL, streamURL, backend)
}

return nil
Expand All @@ -267,42 +231,21 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
httpPort = int(iv)
}

// If any goroutine quit, cancel another one.
parentCtx := ctx
ctx, cancel := context.WithCancel(ctx)

go func() {
select {
case <-ctx.Done():
case <-r.Context().Done():
// If client request cancelled, cancel the proxy goroutine.
cancel()
}
}()

// Connect to backend SRS server via HTTP client.
backendURL := fmt.Sprintf("http://%v:%v%s", backend.IP, httpPort, r.URL.Path)
req, err := http.NewRequestWithContext(ctx, "GET", backendURL, nil)
if err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "create request to %v", backendURL)}
return errors.Wrapf(err, "create request to %v", backendURL)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
if urlErr, ok := err.(*url.Error); ok {
if urlErr.Err == io.EOF {
return &RTMPProxyError{true, errors.Errorf("do request to %v EOF", backendURL)}
}
if urlErr.Err == context.Canceled && r.Context().Err() != nil {
return &RTMPProxyError{false, errors.Wrapf(io.EOF, "client closed")}
}
}
return &RTMPProxyError{true, errors.Wrapf(err, "do request to %v", backendURL)}
return errors.Wrapf(err, "do request to %v", backendURL)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return &RTMPProxyError{true, errors.Errorf("proxy stream to %v failed, status=%v", backendURL, resp.Status)}
return errors.Errorf("proxy stream to %v failed, status=%v", backendURL, resp.Status)
}

// Copy all headers from backend to client.
Expand All @@ -313,64 +256,14 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
}
}

v.written = true
logger.Df(ctx, "HTTP start streaming")

// For all proxy goroutines.
var wg stdSync.WaitGroup
defer wg.Wait()

// Detect the client closed.
wg.Add(1)
var r0 error
go func() {
defer wg.Done()
defer cancel()

r0 = func() error {
select {
case <-ctx.Done():
return nil
case <-r.Context().Done():
return &RTMPProxyError{false, errors.Wrapf(io.EOF, "client closed")}
}
}()
}()

// Copy all data from backend to client.
wg.Add(1)
var r1 error
go func() {
defer wg.Done()
defer cancel()

r1 = func() error {
buf := make([]byte, 4096)
for {
n, err := resp.Body.Read(buf)
if err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "read stream from %v", backendURL)}
}

if _, err := w.Write(buf[:n]); err != nil {
return &RTMPProxyError{false, errors.Wrapf(err, "write stream client")}
}
}
}()
}()

// Wait until all goroutine quit.
wg.Wait()

// Reset the error if caused by another goroutine.
if errors.Cause(r0) == context.Canceled && parentCtx.Err() == nil {
r0 = nil
}
if errors.Cause(r1) == context.Canceled && parentCtx.Err() == nil {
r1 = nil
// Proxy the stream from backend to client.
if _, err := io.Copy(w, resp.Body); err != nil {
return errors.Wrapf(err, "copy stream to client, backend=%v", backendURL)
}

return NewRTMPMultipleError(r0, r1, parentCtx.Err())
return nil
}

type HLSStreaming struct {
Expand Down Expand Up @@ -490,7 +383,7 @@ func (v *HLSStreaming) serveByBackend(ctx context.Context, w http.ResponseWriter
// For TS file, directly copy it.
if !strings.HasSuffix(r.URL.Path, ".m3u8") {
if _, err := io.Copy(w, resp.Body); err != nil {
return errors.Wrapf(err, "write stream client")
return errors.Wrapf(err, "copy stream to client, backend=%v", backendURL)
}

return nil
Expand Down

0 comments on commit 746d432

Please sign in to comment.