Skip to content

Commit

Permalink
Support for GST pipeline debug dot files on the debug handler (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
biglittlebigben authored Jul 31, 2023
1 parent 40e5535 commit a34f255
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 69 deletions.
274 changes: 206 additions & 68 deletions pkg/ipc/ipc.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions pkg/ipc/ipc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@ package ipc;
option go_package = "github.com/livekit/ingress/pkg/ipc";

service IngressHandler {
rpc GetPipelineDot(GstPipelineDebugDotRequest) returns (GstPipelineDebugDotResponse) {};
rpc GetPProf(PProfRequest) returns (PProfResponse) {};
rpc UpdateMediaStats(UpdateMediaStatsRequest) returns (google.protobuf.Empty) {};
}

message GstPipelineDebugDotRequest {}

message GstPipelineDebugDotResponse {
string dot_file = 1;
}

message PProfRequest {
string profile_name = 1;
int32 timeout = 2;
Expand Down
51 changes: 51 additions & 0 deletions pkg/ipc/ipc_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/media/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ func (p *Pipeline) SendEOS(ctx context.Context) {
})
}

func (p *Pipeline) GetGstPipelineDebugDot() string {
return p.pipeline.DebugBinToDotData(gst.DebugGraphShowAll)
}

func getKindFromGstMimeType(gstStruct *gst.Structure) types.StreamKind {
gstMimeType := gstStruct.Name()

Expand Down
30 changes: 29 additions & 1 deletion pkg/service/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
pprofApp = "pprof"
gstPipelineDotFileApp = "gst_pipeline"
pprofApp = "pprof"
)

func (s *Service) StartDebugHandlers() {
Expand All @@ -38,6 +39,7 @@ func (s *Service) StartDebugHandlers() {
}

mux := http.NewServeMux()
mux.HandleFunc(fmt.Sprintf("/%s/", gstPipelineDotFileApp), s.handleGstPipelineDotFile)
mux.HandleFunc(fmt.Sprintf("/%s/", pprofApp), s.handlePProf)

go func() {
Expand All @@ -47,6 +49,32 @@ func (s *Service) StartDebugHandlers() {
}()
}

func (s *Service) GetGstPipelineDotFile(resourceID string) (string, error) {
api, err := s.sm.GetIngressSessionAPI(resourceID)
if err != nil {
return "", err
}

return api.GetPipelineDot(context.Background())
}

// URL path format is "/<application>/<ingress_id>/<optional_other_params>"
func (s *Service) handleGstPipelineDotFile(w http.ResponseWriter, r *http.Request) {
pathElements := strings.Split(r.URL.Path, "/")
if len(pathElements) < 3 {
http.Error(w, "malformed url", http.StatusNotFound)
return
}

resourceID := pathElements[2]
dotFile, err := s.GetGstPipelineDotFile(resourceID)
if err != nil {
http.Error(w, err.Error(), getErrorCode(err))
return
}
_, _ = w.Write([]byte(dotFile))
}

// URL path format is "/<application>/<resource_id>/<profile_name>" or "/<application>/<profile_name>" to profile the service
func (s *Service) handlePProf(w http.ResponseWriter, r *http.Request) {
var err error
Expand Down
28 changes: 28 additions & 0 deletions pkg/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package service
import (
"context"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
google_protobuf2 "google.golang.org/protobuf/types/known/emptypb"

"github.com/frostbyte73/core"
Expand Down Expand Up @@ -158,6 +161,31 @@ func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PPr
}, nil
}

func (h *Handler) GetPipelineDot(ctx context.Context, in *ipc.GstPipelineDebugDotRequest) (*ipc.GstPipelineDebugDotResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetPipelineDot")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrIngressNotFound
}

res := make(chan string, 1)
go func() {
res <- h.pipeline.GetGstPipelineDebugDot()
}()

select {
case r := <-res:
return &ipc.GstPipelineDebugDotResponse{
DotFile: r,
}, nil

case <-time.After(2 * time.Second):
return nil, status.New(codes.DeadlineExceeded, "timed out requesting pipeline debug info").Err()
}

}

func (h *Handler) UpdateMediaStats(ctx context.Context, in *ipc.UpdateMediaStatsRequest) (*google_protobuf2.Empty, error) {
ctx, span := tracer.Start(ctx, "Handler.UpdateMediaStats")
defer span.End()
Expand Down
11 changes: 11 additions & 0 deletions pkg/service/process_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ func (p *process) GetProfileData(ctx context.Context, profileName string, timeou
return resp.PprofFile, nil
}

func (p *process) GetPipelineDot(ctx context.Context) (string, error) {
req := &ipc.GstPipelineDebugDotRequest{}

resp, err := p.grpcClient.GetPipelineDot(ctx, req)
if err != nil {
return "", err
}

return resp.DotFile, nil
}

func (p *process) UpdateMediaStats(ctx context.Context, s *types.MediaStats) error {
req := &ipc.UpdateMediaStatsRequest{
AudioAverateBitrate: s.AudioAverageBitrate,
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ func (a *localSessionAPI) GetProfileData(ctx context.Context, profileName string
return pprof.GetProfileData(ctx, profileName, timeout, debug)
}

func (a *localSessionAPI) GetPipelineDot(ctx context.Context) (string, error) {
// No dot file if transcoding is disabled
return "", errors.ErrIngressNotFound
}

func (a *localSessionAPI) UpdateMediaStats(ctx context.Context, s *types.MediaStats) error {
if s.AudioAverageBitrate != nil && s.AudioCurrentBitrate != nil {
a.params.SetInputAudioBitrate(*s.AudioAverageBitrate, *s.AudioCurrentBitrate)
Expand Down
1 change: 1 addition & 0 deletions pkg/types/session_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "context"

type SessionAPI interface {
GetProfileData(ctx context.Context, profileName string, timeout int, debug int) (b []byte, err error)
GetPipelineDot(ctx context.Context) (string, error)
UpdateMediaStats(ctx context.Context, stats *MediaStats) error
}

Expand Down

0 comments on commit a34f255

Please sign in to comment.