From 63e2e8fa534feacb69cfe45f9afc840a6ced777d Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 3 Sep 2024 16:46:11 +0800 Subject: [PATCH] Refine RTMP streaming error. --- proxy/rtmp.go | 134 ++++++++----------------------------------------- proxy/utils.go | 15 ------ 2 files changed, 20 insertions(+), 129 deletions(-) diff --git a/proxy/rtmp.go b/proxy/rtmp.go index ee9692c3db..0dc28e873e 100644 --- a/proxy/rtmp.go +++ b/proxy/rtmp.go @@ -86,41 +86,11 @@ func (v *rtmpServer) Run(ctx context.Context) error { defer v.wg.Done() defer conn.Close() - var backendClosedErr, clientClosedErr bool - - handleBackendErr := func(err error) { - if isPeerClosedError(err) { - if !backendClosedErr { - backendClosedErr = true - logger.Df(ctx, "RTMP backend peer closed") - } - } else { - logger.Wf(ctx, "RTMP backend err %+v", err) - } - } - - handleClientErr := func(err error) { - if isPeerClosedError(err) { - if !clientClosedErr { - clientClosedErr = true - logger.Df(ctx, "RTMP client peer closed") - } - } else { - logger.Wf(ctx, "RTMP client %v err %+v", conn.RemoteAddr(), err) - } - } - handleErr := func(err error) { - if perr, ok := err.(*RTMPProxyError); ok { - // For proxy error, maybe caused by proxy or client. - if perr.isBackend { - handleBackendErr(perr.err) - } else { - handleClientErr(perr.err) - } + if isPeerClosedError(err) { + logger.Df(ctx, "RTMP peer is closed") } else { - // Default as client error. - handleClientErr(err) + logger.Wf(ctx, "RTMP serve err %+v", err) } } @@ -128,15 +98,7 @@ func (v *rtmpServer) Run(ctx context.Context) error { client.rd = v.rd }) if err := rc.serve(ctx, conn); err != nil { - if merr, ok := err.(*RTMPMultipleError); ok { - // If multiple errors, handle all of them. - for _, err := range merr.errs { - handleErr(err) - } - } else { - // If single error, directly handle it. - handleErr(err) - } + handleErr(err) } else { logger.Df(ctx, "RTMP client done") } @@ -147,60 +109,6 @@ func (v *rtmpServer) Run(ctx context.Context) error { return nil } -type RTMPMultipleError struct { - // The caused errors. - errs []error -} - -// NewRTMPMultipleError ignore nil errors. If no error, return nil. -func NewRTMPMultipleError(errs ...error) error { - var nerrs []error - for _, err := range errs { - if errors.Cause(err) != nil { - nerrs = append(nerrs, err) - } - } - - if len(nerrs) == 0 { - return nil - } - - return &RTMPMultipleError{errs: nerrs} -} - -func (v *RTMPMultipleError) Error() string { - var b strings.Builder - for i, err := range v.errs { - if i > 0 { - b.WriteString(", ") - } - b.WriteString(err.Error()) - } - return b.String() -} - -func (v *RTMPMultipleError) Cause() error { - if len(v.errs) == 0 { - return nil - } - return v.errs[0] -} - -type RTMPProxyError struct { - // Whether error is caused by backend. - isBackend bool - // The caused error. - err error -} - -func (v *RTMPProxyError) Error() string { - return v.err.Error() -} - -func (v *RTMPProxyError) Cause() error { - return v.err -} - type RTMPConnection struct { // The random number generator. rd *rand.Rand @@ -217,13 +125,15 @@ func NewRTMPConnection(opts ...func(*RTMPConnection)) *RTMPConnection { func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { logger.Df(ctx, "Got RTMP client from %v", conn.RemoteAddr()) - // Close the connection when ctx done. + // If any goroutine quit, cancel another one. + parentCtx := ctx + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var backend *RTMPClientToBackend if true { - connDoneCtx, connDoneCancel := context.WithCancel(ctx) - defer connDoneCancel() go func() { - <-connDoneCtx.Done() + <-ctx.Done() conn.Close() if backend != nil { backend.Close() @@ -380,7 +290,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { defer backend.Close() if err := backend.Connect(ctx, tcUrl, streamName); err != nil { - return &RTMPProxyError{true, errors.Wrapf(err, "connect backend, tcUrl=%v, stream=%v", tcUrl, streamName)} + return errors.Wrapf(err, "connect backend, tcUrl=%v, stream=%v", tcUrl, streamName) } // Start the streaming. @@ -424,10 +334,6 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { var wg sync.WaitGroup defer wg.Wait() - // If any goroutine quit, cancel another one. - parentCtx := ctx - ctx, cancel := context.WithCancel(ctx) - // Proxy all message from backend to client. wg.Add(1) var r0 error @@ -439,13 +345,13 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { for { m, err := backend.client.ReadMessage(ctx) if err != nil { - return &RTMPProxyError{true, errors.Wrapf(err, "read message")} + return errors.Wrapf(err, "read message") } //logger.Df(ctx, "client<- %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload)) // TODO: Update the stream ID if not the same. if err := client.WriteMessage(ctx, m); err != nil { - return &RTMPProxyError{false, errors.Wrapf(err, "write message")} + return errors.Wrapf(err, "write message") } } }() @@ -462,13 +368,13 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { for { m, err := client.ReadMessage(ctx) if err != nil { - return &RTMPProxyError{false, errors.Wrapf(err, "read message")} + return errors.Wrapf(err, "read message") } //logger.Df(ctx, "client-> %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload)) // TODO: Update the stream ID if not the same. if err := backend.client.WriteMessage(ctx, m); err != nil { - return &RTMPProxyError{true, errors.Wrapf(err, "write message")} + return errors.Wrapf(err, "write message") } } }() @@ -478,14 +384,14 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error { wg.Wait() // Reset the error if caused by another goroutine. - if errors.Cause(r0) == context.Canceled && parentCtx.Err() == nil { - r0 = nil + if r0 != nil { + return errors.Wrapf(r0, "proxy backend->client") } - if errors.Cause(r1) == context.Canceled && parentCtx.Err() == nil { - r1 = nil + if r1 != nil { + return errors.Wrapf(r1, "proxy client->backend") } - return NewRTMPMultipleError(r0, r1, parentCtx.Err()) + return parentCtx.Err() } type RTMPClientType string diff --git a/proxy/utils.go b/proxy/utils.go index 43aeea6363..e0115bcd30 100644 --- a/proxy/utils.go +++ b/proxy/utils.go @@ -140,18 +140,3 @@ func convertURLToStreamURL(r *http.Request) (unifiedURL, fullURL string) { fullURL = fmt.Sprintf("%v%v", unifiedURL, streamExt) return } - -// wrapProxyError extract and wrap the proxy and multiple errors with extraMsg. -func wrapProxyError(err error, extraMsg string) error { - if perr, ok := err.(*RTMPProxyError); ok { - return &RTMPProxyError{perr.isBackend, errors.Wrapf(perr.err, extraMsg)} - } else if merr, ok := err.(*RTMPMultipleError); ok { - var errs []error - for _, e := range merr.errs { - errs = append(errs, errors.Wrapf(e, extraMsg)) - } - return NewRTMPMultipleError(errs...) - } else { - return errors.Wrapf(err, extraMsg) - } -}