Skip to content

Commit

Permalink
Improve Ingress A/V sync (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
biglittlebigben authored Oct 16, 2023
1 parent 9a0e2c4 commit a02c801
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 117 deletions.
26 changes: 13 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ require (
github.com/gorilla/mux v1.8.0
github.com/livekit/go-rtmp v0.0.0-20230829211117-1c4f5a5c81ed
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a
github.com/livekit/protocol v1.6.2-0.20230824204903-ecca17670daa
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f
github.com/livekit/protocol v1.7.2
github.com/livekit/psrpc v0.3.3
github.com/livekit/server-sdk-go v1.0.16-0.20230815025737-c12cd2eb8fe8
github.com/livekit/server-sdk-go v1.0.17-0.20230928233454-b49bf45b164b
github.com/pion/dtls/v2 v2.2.7
github.com/pion/interceptor v0.1.17
github.com/pion/interceptor v0.1.18
github.com/pion/rtcp v1.2.10
github.com/pion/rtp v1.8.1
github.com/pion/webrtc/v3 v3.2.16
github.com/pion/webrtc/v3 v3.2.20
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.4
Expand All @@ -28,7 +28,7 @@ require (
github.com/yutopp/go-flv v0.2.0
go.uber.org/atomic v1.11.0
golang.org/x/image v0.7.0
google.golang.org/grpc v1.57.0
google.golang.org/grpc v1.58.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -47,7 +47,7 @@ require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
Expand All @@ -64,15 +64,15 @@ require (
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/datachannel v1.5.5 // indirect
github.com/pion/ice/v2 v2.3.10 // indirect
github.com/pion/ice/v2 v2.3.11 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.7 // indirect
github.com/pion/mdns v0.0.8 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/sctp v1.8.7 // indirect
github.com/pion/sctp v1.8.8 // indirect
github.com/pion/sdp/v3 v3.0.6 // indirect
github.com/pion/srtp/v2 v2.0.16 // indirect
github.com/pion/srtp/v2 v2.0.17 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.1 // indirect
github.com/pion/transport/v2 v2.2.3 // indirect
github.com/pion/turn/v2 v2.1.3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -89,7 +89,7 @@ require (
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
74 changes: 36 additions & 38 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func (c *Config) InitWhipConf() error {
return nil
}

if c.RTCConfig.UDPPort == 0 && c.RTCConfig.ICEPortRangeStart == 0 {
c.RTCConfig.UDPPort = 7885
if c.RTCConfig.UDPPort.Start == 0 && c.RTCConfig.ICEPortRangeStart == 0 {
c.RTCConfig.UDPPort.Start = 7885
}

// Validate will set the NodeIP
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
ErrIngressNotFound = psrpc.NewErrorf(psrpc.NotFound, "ingress not found")
ErrServerCapacityExceeded = psrpc.NewErrorf(psrpc.ResourceExhausted, "server capacity exceeded")
ErrServerShuttingDown = psrpc.NewErrorf(psrpc.Unavailable, "server shutting down")
ErrIngressClosing = psrpc.NewErrorf(psrpc.Unavailable, "ingress closing")
ErrMissingStreamKey = psrpc.NewErrorf(psrpc.InvalidArgument, "missing stream key")
ErrPrerollBufferReset = psrpc.NewErrorf(psrpc.Internal, "preroll buffer reset")
)
Expand Down
96 changes: 66 additions & 30 deletions pkg/media/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package media
import (
"fmt"
"io"
"time"

"github.com/frostbyte73/core"
"github.com/pion/webrtc/v3/pkg/media"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/tinyzimmer/go-gst/gst/app"

"github.com/livekit/ingress/pkg/errors"
"github.com/livekit/ingress/pkg/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
Expand All @@ -41,14 +43,20 @@ type Output struct {
bin *gst.Bin
logger logger.Logger

elements []*gst.Element
enc *gst.Element
sink *app.Sink
elements []*gst.Element
enc *gst.Element
sink *app.Sink
outputSync *utils.TrackOutputSynchronizer

samples chan *media.Sample
samples chan *sample
fuse core.Fuse
}

type sample struct {
s *media.Sample
ts time.Duration
}

// FIXME Use generics instead?
type VideoOutput struct {
*Output
Expand All @@ -62,8 +70,8 @@ type AudioOutput struct {
codec livekit.AudioCodec
}

func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer) (*VideoOutput, error) {
e, err := newVideoOutput(codec)
func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer, outputSync *utils.TrackOutputSynchronizer) (*VideoOutput, error) {
e, err := newVideoOutput(codec, outputSync)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -207,8 +215,8 @@ func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer) (*Video
return e, nil
}

func NewAudioOutput(options *livekit.IngressAudioEncodingOptions) (*AudioOutput, error) {
e, err := newAudioOutput(options.AudioCodec)
func NewAudioOutput(options *livekit.IngressAudioEncodingOptions, outputSync *utils.TrackOutputSynchronizer) (*AudioOutput, error) {
e, err := newAudioOutput(options.AudioCodec, outputSync)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -294,8 +302,8 @@ func NewAudioOutput(options *livekit.IngressAudioEncodingOptions) (*AudioOutput,
return e, nil
}

func newVideoOutput(codec livekit.VideoCodec) (*VideoOutput, error) {
e, err := newOutput()
func newVideoOutput(codec livekit.VideoCodec, outputSync *utils.TrackOutputSynchronizer) (*VideoOutput, error) {
e, err := newOutput(outputSync)
if err != nil {
return nil, err
}
Expand All @@ -313,8 +321,8 @@ func newVideoOutput(codec livekit.VideoCodec) (*VideoOutput, error) {
return o, nil
}

func newAudioOutput(codec livekit.AudioCodec) (*AudioOutput, error) {
e, err := newOutput()
func newAudioOutput(codec livekit.AudioCodec, outputSync *utils.TrackOutputSynchronizer) (*AudioOutput, error) {
e, err := newOutput(outputSync)
if err != nil {
return nil, err
}
Expand All @@ -332,16 +340,17 @@ func newAudioOutput(codec livekit.AudioCodec) (*AudioOutput, error) {
return o, nil
}

func newOutput() (*Output, error) {
func newOutput(outputSync *utils.TrackOutputSynchronizer) (*Output, error) {
sink, err := app.NewAppSink()
if err != nil {
return nil, err
}

e := &Output{
sink: sink,
samples: make(chan *media.Sample, 100),
fuse: core.NewFuse(),
sink: sink,
outputSync: outputSync,
samples: make(chan *sample, 100),
fuse: core.NewFuse(),
}

return e, nil
Expand Down Expand Up @@ -381,28 +390,38 @@ func (e *Output) handleEOS(_ *app.Sink) {
e.fuse.Break()
}

func (e *Output) writeSample(sample *media.Sample) error {
func (e *Output) writeSample(s *media.Sample, pts time.Duration) error {
select {
case e.samples <- sample:
case e.samples <- &sample{s, pts}:
return nil
case <-e.fuse.Watch():
return io.EOF
}
}

func (e *Output) NextSample() (media.Sample, error) {
var sample *media.Sample
var s *sample

select {
case sample = <-e.samples:
case <-e.fuse.Watch():
}
for {
select {
case s = <-e.samples:
case <-e.fuse.Watch():
}

if sample == nil {
return media.Sample{}, io.EOF
}
if s == nil {
return media.Sample{}, io.EOF
}

return *sample, nil
drop, err := e.outputSync.WaitForMediaTime(s.ts)
if err != nil {
return media.Sample{}, err
}
if drop {
e.logger.Debugw("Dropping sample", "timestamp", s.ts)
continue
}
return *s.s, nil
}
}

func (e *Output) OnBind() error {
Expand All @@ -420,6 +439,7 @@ func (e *Output) OnUnbind() error {
func (e *Output) Close() error {

e.fuse.Break()
e.outputSync.Close()

return nil
}
Expand All @@ -437,7 +457,15 @@ func (e *VideoOutput) handleSample(sink *app.Sink) gst.FlowReturn {
return gst.FlowError
}

segment := s.GetSegment()
if buffer == nil {
return gst.FlowError
}

duration := buffer.Duration()
pts := buffer.PresentationTimestamp()

ts := time.Duration(segment.ToRunningTime(gst.FormatTime, uint64(pts)))

var err error
switch e.codec {
Expand Down Expand Up @@ -480,14 +508,14 @@ func (e *VideoOutput) handleSample(sink *app.Sink) gst.FlowReturn {
err = e.writeSample(&media.Sample{
Data: buffer.Bytes(),
Duration: duration,
})
}, ts)

case livekit.VideoCodec_VP8:
// untested
err = e.writeSample(&media.Sample{
Data: buffer.Bytes(),
Duration: duration,
})
}, ts)
}

return errors.ErrorToGstFlowReturn(err)
Expand All @@ -506,7 +534,15 @@ func (e *AudioOutput) handleSample(sink *app.Sink) gst.FlowReturn {
return gst.FlowError
}

segment := s.GetSegment()
if buffer == nil {
return gst.FlowError
}

duration := buffer.Duration()
pts := buffer.PresentationTimestamp()

ts := time.Duration(segment.ToRunningTime(gst.FormatTime, uint64(pts)))

var err error

Expand All @@ -515,7 +551,7 @@ func (e *AudioOutput) handleSample(sink *app.Sink) gst.FlowReturn {
err = e.writeSample(&media.Sample{
Data: buffer.Bytes(),
Duration: duration,
})
}, ts)
}

return errors.ErrorToGstFlowReturn(err)
Expand Down
19 changes: 11 additions & 8 deletions pkg/media/webrtc_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import (
"github.com/livekit/ingress/pkg/lksdk_output"
"github.com/livekit/ingress/pkg/params"
"github.com/livekit/ingress/pkg/types"
"github.com/livekit/ingress/pkg/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils"
putils "github.com/livekit/protocol/utils"
)

type WebRTCSink struct {
params *params.Params

sdkOut *lksdk_output.LKSDKOutput
sdkOut *lksdk_output.LKSDKOutput
outputSync *utils.OutputSynchronizer
}

func NewWebRTCSink(ctx context.Context, p *params.Params) (*WebRTCSink, error) {
Expand All @@ -45,19 +47,20 @@ func NewWebRTCSink(ctx context.Context, p *params.Params) (*WebRTCSink, error) {
}

return &WebRTCSink{
params: p,
sdkOut: sdkOut,
params: p,
sdkOut: sdkOut,
outputSync: utils.NewOutputSynchronizer(),
}, nil
}

func (s *WebRTCSink) addAudioTrack() (*Output, error) {
output, err := NewAudioOutput(s.params.AudioEncodingOptions)
output, err := NewAudioOutput(s.params.AudioEncodingOptions, s.outputSync.AddTrack())
if err != nil {
logger.Errorw("could not create output", err)
return nil, err
}

err = s.sdkOut.AddAudioTrack(output, utils.GetMimeTypeForAudioCodec(s.params.AudioEncodingOptions.AudioCodec), s.params.AudioEncodingOptions.DisableDtx, s.params.AudioEncodingOptions.Channels > 1)
err = s.sdkOut.AddAudioTrack(output, putils.GetMimeTypeForAudioCodec(s.params.AudioEncodingOptions.AudioCodec), s.params.AudioEncodingOptions.DisableDtx, s.params.AudioEncodingOptions.Channels > 1)
if err != nil {
return nil, err
}
Expand All @@ -73,7 +76,7 @@ func (s *WebRTCSink) addVideoTrack(w, h int) ([]*Output, error) {

var outLayers []*livekit.VideoLayer
for _, layer := range sortedLayers {
output, err := NewVideoOutput(s.params.VideoEncodingOptions.VideoCodec, layer)
output, err := NewVideoOutput(s.params.VideoEncodingOptions.VideoCodec, layer, s.outputSync.AddTrack())
if err != nil {
return nil, err
}
Expand All @@ -82,7 +85,7 @@ func (s *WebRTCSink) addVideoTrack(w, h int) ([]*Output, error) {
outLayers = append(outLayers, layer)
}

err := s.sdkOut.AddVideoTrack(sbArray, outLayers, utils.GetMimeTypeForVideoCodec(s.params.VideoEncodingOptions.VideoCodec))
err := s.sdkOut.AddVideoTrack(sbArray, outLayers, putils.GetMimeTypeForVideoCodec(s.params.VideoEncodingOptions.VideoCodec))
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a02c801

Please sign in to comment.