From d774e61e6bc9c7fa672d44d9d51d9779e39a2a63 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Wed, 11 Sep 2024 10:37:00 -0700 Subject: [PATCH] use config value for max upload queue size --- pkg/config/base.go | 1 + pkg/config/service.go | 4 ++++ pkg/pipeline/sink/image.go | 2 +- pkg/pipeline/sink/segments.go | 2 +- 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/config/base.go b/pkg/config/base.go index 4049eae4..6da0f6c4 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -39,6 +39,7 @@ type BaseConfig struct { BackupStorage string `yaml:"backup_storage"` // backup file location for failed uploads ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration + MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS) SessionLimits `yaml:"session_limits"` // session duration limits diff --git a/pkg/config/service.go b/pkg/config/service.go index 4ea60381..135ee2db 100644 --- a/pkg/config/service.go +++ b/pkg/config/service.go @@ -37,6 +37,7 @@ const ( trackCpuCost = 0.5 maxCpuUtilization = 0.8 maxConcurrentWeb = 18 + maxUploadQueue = 60 defaultTemplatePort = 7980 defaultTemplateBaseTemplate = "http://localhost:%d/" @@ -115,6 +116,9 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) { if conf.MaxConcurrentWeb <= 0 { conf.MaxConcurrentWeb = maxConcurrentWeb } + if conf.MaxUploadQueue <= 0 { + conf.MaxUploadQueue = maxUploadQueue + } if conf.TemplateBase == "" { conf.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, conf.TemplatePort) diff --git a/pkg/pipeline/sink/image.go b/pkg/pipeline/sink/image.go index 34df0efe..8bfe2e7d 100644 --- a/pkg/pipeline/sink/image.go +++ b/pkg/pipeline/sink/image.go @@ -54,7 +54,7 @@ type imageUpdate struct { } func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.ImageConfig, callbacks *gstreamer.Callbacks) (*ImageSink, error) { - maxPendingUploads := 900 / o.CaptureInterval + maxPendingUploads := (p.MaxUploadQueue * 60) / int(o.CaptureInterval) return &ImageSink{ Uploader: u, ImageConfig: o, diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 7c4ec055..727f772a 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -90,7 +90,7 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg outputType = types.OutputTypeTS } - maxPendingUploads := 900 / o.SegmentDuration + maxPendingUploads := (p.MaxUploadQueue * 60) / o.SegmentDuration s := &SegmentSink{ Uploader: u, SegmentConfig: o,