Skip to content

Commit

Permalink
Reset service on test failure (#426)
Browse files Browse the repository at this point in the history
* reset service on test failure

* clean up errors
  • Loading branch information
frostbyte73 committed Jul 11, 2023
1 parent cbb601d commit d575729
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 32 deletions.
7 changes: 4 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,15 @@ func runService(c *cli.Context) error {

svc.StartDebugHandlers()

return svc.Run()
err = svc.Run()
svc.Close()
return err
}

func runHandler(c *cli.Context) error {
configBody := c.String("config")
if configBody == "" {
err := errors.ErrNoConfig
return err
return errors.ErrNoConfig
}

req := &rpc.StartEgressRequest{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/output/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (b *Bin) buildStreamOutput(p *config.PipelineConfig) (*StreamOutput, error)
return nil, errors.ErrGstPipelineError(err)
}

if err := b.bin.AddMany(mux, tee); err != nil {
if err = b.bin.AddMany(mux, tee); err != nil {
return nil, errors.ErrGstPipelineError(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (p *Pipeline) UpdateStream(ctx context.Context, req *livekit.UpdateStreamRe
}

// add stream
if err := p.out.AddStream(url); err != nil {
if err = p.out.AddStream(url); err != nil {
errs.AppendErr(err)
continue
}
Expand Down
51 changes: 31 additions & 20 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,55 +54,56 @@ func NewService(conf *config.ServiceConfig, bus psrpc.MessageBus, rpcServerV0 eg
}
s.psrpcServer = psrpcServer

err = s.psrpcServer.RegisterStartEgressTopic(conf.ClusterID)
if err != nil {
return nil, err
}

if conf.PrometheusPort > 0 {
s.promServer = &http.Server{
Addr: fmt.Sprintf(":%d", conf.PrometheusPort),
Handler: promhttp.Handler(),
}
}

if err := s.monitor.Start(s.conf, s.isAvailable); err != nil {
if err = s.monitor.Start(s.conf, s.isAvailable); err != nil {
return nil, err
}

return s, nil
}

func (s *Service) Run() error {
logger.Debugw("starting service", "version", version.Version)

if s.promServer != nil {
promListener, err := net.Listen("tcp", s.promServer.Addr)
if err != nil {
return err
return nil, err
}
go func() {
_ = s.promServer.Serve(promListener)
}()
}

return s, nil
}

func (s *Service) Run() error {
logger.Debugw("starting service", "version", version.Version)

if err := s.psrpcServer.RegisterStartEgressTopic(s.conf.ClusterID); err != nil {
return err
}

if s.rpcServerV0 != nil {
return s.runV0()
}

logger.Infow("service ready")

<-s.shutdown.Watch()
logger.Infow("shutting down")
s.psrpcServer.DeregisterStartEgressTopic(s.conf.ClusterID)
for !s.manager.isIdle() {
time.Sleep(shutdownTimer)
}
s.psrpcServer.Shutdown()

return nil
}

func (s *Service) Reset() {
if !s.shutdown.IsBroken() {
s.Stop(false)
}

s.shutdown = core.NewFuse()
}

func (s *Service) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
ctx, span := tracer.Start(ctx, "Service.StartEgress")
defer span.End()
Expand Down Expand Up @@ -181,7 +182,9 @@ func (s *Service) onFatalError(info *livekit.EgressInfo) {
}

func (s *Service) Stop(kill bool) {
s.shutdown.Break()
s.shutdown.Once(func() {
s.psrpcServer.DeregisterStartEgressTopic(s.conf.ClusterID)
})
if kill {
s.manager.killAll()
}
Expand All @@ -190,3 +193,11 @@ func (s *Service) Stop(kill bool) {
func (s *Service) KillAll() {
s.manager.killAll()
}

func (s *Service) Close() {
for !s.manager.isIdle() {
time.Sleep(shutdownTimer)
}
logger.Infow("closing server")
s.psrpcServer.Shutdown()
}
3 changes: 2 additions & 1 deletion pkg/service/service_deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ func (s *Service) runV0() error {
select {
case <-shutdown:
logger.Infow("shutting down")
s.psrpcServer.Shutdown()
s.psrpcServer.DeregisterStartEgressTopic(s.conf.ClusterID)
for !s.manager.isIdle() {
time.Sleep(shutdownTimer)
}
s.psrpcServer.Shutdown()
return nil

case msg := <-requests.Channel():
Expand Down
2 changes: 1 addition & 1 deletion pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (m *Monitor) Start(conf *config.ServiceConfig, isAvailable func() float64)

m.cpuStats = cpuStats

if err := m.checkCPUConfig(); err != nil {
if err = m.checkCPUConfig(); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions test/room_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (r *Runner) runRoomTest(t *testing.T, name string, audioCodec, videoCodec t
r.awaitIdle(t)
r.publishSamplesToRoom(t, audioCodec, videoCodec)
f(t)
if t.Failed() {
r.svc.Reset()
go r.svc.Run()
}
})
}

Expand Down
10 changes: 5 additions & 5 deletions test/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ func (r *Runner) Run(t *testing.T, bus psrpc.MessageBus, templateFs fs.FS) {
err = svc.StartTemplatesServer(templateFs)
require.NoError(t, err)

go func() {
err := svc.Run()
require.NoError(t, err)
}()
t.Cleanup(func() { svc.Stop(true) })
go svc.Run()
t.Cleanup(func() {
svc.Stop(true)
svc.Close()
})
time.Sleep(time.Second * 3)

// subscribe to update channel
Expand Down
4 changes: 4 additions & 0 deletions test/track_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (r *Runner) runSDKTest(t *testing.T, name string, audioCodec, videoCodec ty
r.awaitIdle(t)
audioTrackID, videoTrackID := r.publishSamplesToRoom(t, audioCodec, videoCodec)
f(t, audioTrackID, videoTrackID)
if t.Failed() {
r.svc.Reset()
go r.svc.Run()
}
})
}

Expand Down
4 changes: 4 additions & 0 deletions test/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func (r *Runner) runWebTest(t *testing.T, name string, f func(t *testing.T)) {
t.Run(name, func(t *testing.T) {
r.awaitIdle(t)
f(t)
if t.Failed() {
r.svc.Reset()
go r.svc.Run()
}
})
}

Expand Down

0 comments on commit d575729

Please sign in to comment.