Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for GST pipeline debug dot files on the debug handler #136

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 206 additions & 68 deletions pkg/ipc/ipc.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions pkg/ipc/ipc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@ package ipc;
option go_package = "github.com/livekit/ingress/pkg/ipc";

service IngressHandler {
rpc GetPipelineDot(GstPipelineDebugDotRequest) returns (GstPipelineDebugDotResponse) {};
rpc GetPProf(PProfRequest) returns (PProfResponse) {};
rpc UpdateMediaStats(UpdateMediaStatsRequest) returns (google.protobuf.Empty) {};
}

message GstPipelineDebugDotRequest {}

message GstPipelineDebugDotResponse {
string dot_file = 1;
}

message PProfRequest {
string profile_name = 1;
int32 timeout = 2;
Expand Down
51 changes: 51 additions & 0 deletions pkg/ipc/ipc_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/media/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ func (p *Pipeline) SendEOS(ctx context.Context) {
})
}

func (p *Pipeline) GetGstPipelineDebugDot() string {
return p.pipeline.DebugBinToDotData(gst.DebugGraphShowAll)
}

func getKindFromGstMimeType(gstStruct *gst.Structure) types.StreamKind {
gstMimeType := gstStruct.Name()

Expand Down
30 changes: 29 additions & 1 deletion pkg/service/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
pprofApp = "pprof"
gstPipelineDotFileApp = "gst_pipeline"
pprofApp = "pprof"
)

func (s *Service) StartDebugHandlers() {
Expand All @@ -38,6 +39,7 @@ func (s *Service) StartDebugHandlers() {
}

mux := http.NewServeMux()
mux.HandleFunc(fmt.Sprintf("/%s/", gstPipelineDotFileApp), s.handleGstPipelineDotFile)
mux.HandleFunc(fmt.Sprintf("/%s/", pprofApp), s.handlePProf)

go func() {
Expand All @@ -47,6 +49,32 @@ func (s *Service) StartDebugHandlers() {
}()
}

func (s *Service) GetGstPipelineDotFile(resourceID string) (string, error) {
api, err := s.sm.GetIngressSessionAPI(resourceID)
if err != nil {
return "", err
}

return api.GetPipelineDot(context.Background())
}

// URL path format is "/<application>/<ingress_id>/<optional_other_params>"
func (s *Service) handleGstPipelineDotFile(w http.ResponseWriter, r *http.Request) {
pathElements := strings.Split(r.URL.Path, "/")
if len(pathElements) < 3 {
http.Error(w, "malformed url", http.StatusNotFound)
return
}

resourceID := pathElements[2]
dotFile, err := s.GetGstPipelineDotFile(resourceID)
if err != nil {
http.Error(w, err.Error(), getErrorCode(err))
return
}
_, _ = w.Write([]byte(dotFile))
}

// URL path format is "/<application>/<resource_id>/<profile_name>" or "/<application>/<profile_name>" to profile the service
func (s *Service) handlePProf(w http.ResponseWriter, r *http.Request) {
var err error
Expand Down
28 changes: 28 additions & 0 deletions pkg/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package service
import (
"context"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
google_protobuf2 "google.golang.org/protobuf/types/known/emptypb"

"github.com/frostbyte73/core"
Expand Down Expand Up @@ -158,6 +161,31 @@ func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PPr
}, nil
}

func (h *Handler) GetPipelineDot(ctx context.Context, in *ipc.GstPipelineDebugDotRequest) (*ipc.GstPipelineDebugDotResponse, error) {
ctx, span := tracer.Start(ctx, "Handler.GetPipelineDot")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrIngressNotFound
}

res := make(chan string, 1)
go func() {
res <- h.pipeline.GetGstPipelineDebugDot()
}()

select {
case r := <-res:
return &ipc.GstPipelineDebugDotResponse{
DotFile: r,
}, nil

case <-time.After(2 * time.Second):
return nil, status.New(codes.DeadlineExceeded, "timed out requesting pipeline debug info").Err()
}

}

func (h *Handler) UpdateMediaStats(ctx context.Context, in *ipc.UpdateMediaStatsRequest) (*google_protobuf2.Empty, error) {
ctx, span := tracer.Start(ctx, "Handler.UpdateMediaStats")
defer span.End()
Expand Down
11 changes: 11 additions & 0 deletions pkg/service/process_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ func (p *process) GetProfileData(ctx context.Context, profileName string, timeou
return resp.PprofFile, nil
}

func (p *process) GetPipelineDot(ctx context.Context) (string, error) {
req := &ipc.GstPipelineDebugDotRequest{}

resp, err := p.grpcClient.GetPipelineDot(ctx, req)
if err != nil {
return "", err
}

return resp.DotFile, nil
}

func (p *process) UpdateMediaStats(ctx context.Context, s *types.MediaStats) error {
req := &ipc.UpdateMediaStatsRequest{
AudioAverateBitrate: s.AudioAverageBitrate,
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ func (a *localSessionAPI) GetProfileData(ctx context.Context, profileName string
return pprof.GetProfileData(ctx, profileName, timeout, debug)
}

func (a *localSessionAPI) GetPipelineDot(ctx context.Context) (string, error) {
// No dot file if transcoding is disabled
return "", errors.ErrIngressNotFound
}

func (a *localSessionAPI) UpdateMediaStats(ctx context.Context, s *types.MediaStats) error {
if s.AudioAverageBitrate != nil && s.AudioCurrentBitrate != nil {
a.params.SetInputAudioBitrate(*s.AudioAverageBitrate, *s.AudioCurrentBitrate)
Expand Down
1 change: 1 addition & 0 deletions pkg/types/session_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "context"

type SessionAPI interface {
GetProfileData(ctx context.Context, profileName string, timeout int, debug int) (b []byte, err error)
GetPipelineDot(ctx context.Context) (string, error)
UpdateMediaStats(ctx context.Context, stats *MediaStats) error
}

Expand Down