From f42792a1065070df6861f12a65d36cbdd0b0a922 Mon Sep 17 00:00:00 2001 From: igorshevach Date: Mon, 6 Nov 2023 10:48:41 +0200 Subject: [PATCH 1/8] - add throttler.h,c - define config throttler.maxDataRate as max limit on transcoder usage. * measured in times x normal fps. e.g. if maxDataRate = 2 we expect no more than twice the original throughput. * By default maxDataRate is set to Infinity and no data rate limit is performed. --- transcoder/common/json_parser.c | 2 +- transcoder/config_v.json | 7 ++- transcoder/receiver_server.c | 9 +++- transcoder/utils/logger.h | 1 + transcoder/utils/samples_stats.c | 9 +++- transcoder/utils/samples_stats.h | 1 + transcoder/utils/throttler.c | 86 ++++++++++++++++++++++++++++++++ transcoder/utils/throttler.h | 20 ++++++++ 8 files changed, 130 insertions(+), 5 deletions(-) create mode 100644 transcoder/utils/throttler.c create mode 100644 transcoder/utils/throttler.h diff --git a/transcoder/common/json_parser.c b/transcoder/common/json_parser.c index f8957af5..9e1c9536 100644 --- a/transcoder/common/json_parser.c +++ b/transcoder/common/json_parser.c @@ -1000,6 +1000,6 @@ json_status_t json_get_double(const json_value_t* obj,char* path,double defaultV if (jresult->type!=JSON_FRAC) { return JSON_BAD_DATA; } - *result = ((double)jresult->v.num.denom) / ((double)jresult->v.num.num); + *result = ((double)jresult->v.num.num) / ((double)jresult->v.num.denom); return JSON_OK; } diff --git a/transcoder/config_v.json b/transcoder/config_v.json index 229b36e7..aacf72f4 100644 --- a/transcoder/config_v.json +++ b/transcoder/config_v.json @@ -1,12 +1,17 @@ { "input": { "file": "/media/worng_order_video_only.mp4", - "realTime": true, + "realTime": false, "activeStream": 0, "xduration": 9000000, "randomDataPercentage": 0, "jumpOffsetSec": -60 }, + "throttler": { + "maxDataRate": 2.5, + "coldSeconds": 0, + "minThrottleWaitMs": 1 + }, "frameDropper1": { "enabled": false, "queueDuration": 10, diff --git a/transcoder/receiver_server.c b/transcoder/receiver_server.c index 5b5529ae..d04f5365 100644 --- a/transcoder/receiver_server.c +++ b/transcoder/receiver_server.c @@ -9,6 +9,7 @@ #include "receiver_server.h" #include "KMP/KMP.h" #include "transcode_session.h" +#include "utils/throttler.h" int atomFileWrite (char* fileName,char* content,size_t size) @@ -27,8 +28,9 @@ int atomFileWrite (char* fileName,char* content,size_t size) int processedFrameCB(receiver_server_session_t *session,bool completed) { uint64_t now=av_gettime(); + receiver_server_t *server=session->server; if (completed || now-session->lastStatsUpdated>session->diagnosticsIntervalInSeconds) {//1 second interval - receiver_server_t *server=session->server; + char* tmpBuf=av_malloc(MAX_DIAGNOSTICS_STRING_LENGTH); @@ -67,9 +69,11 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran bool autoAckMode; uint64_t received_frame_id=0; kmp_frame_position_t current_position; + throttler_t throttler = {0}; json_get_bool(GetConfig(),"autoAckModeEnabled",false,&autoAckMode); + _S(throttler_init(&server->receiverStats,&throttler)); while (retVal >= 0 && session->kmpClient.socket) { @@ -119,6 +123,9 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran pthread_mutex_lock(&server->diagnostics_locker); // lock the critical section samples_stats_add(&server->receiverStats,packet->dts,packet->pos,packet->size); pthread_mutex_unlock(&server->diagnostics_locker); // lock the critical section + + throttler_process(&throttler,transcode_session); + if(add_packet_frame_id_and_pts(packet,received_frame_id,packet->pts)){ LOGGER(CATEGORY_RECEIVER,AV_LOG_ERROR,"[%s] failed to set frame id %lld on packet",session->stream_name,received_frame_id); } diff --git a/transcoder/utils/logger.h b/transcoder/utils/logger.h index dfd5f9fa..8ac370c8 100644 --- a/transcoder/utils/logger.h +++ b/transcoder/utils/logger.h @@ -20,6 +20,7 @@ #define CATEGORY_RECEIVER "RECEIVER" #define CATEGORY_KMP "KMP" #define CATEGORY_HTTP_SERVER "HTTPSERVER" +#define CATEGORY_THROTTLER "THROTTLER" void logger1(const char* category,int level,const char *fmt, ...); void loggerFlush(); diff --git a/transcoder/utils/samples_stats.c b/transcoder/utils/samples_stats.c index c99459fe..59fdfaea 100644 --- a/transcoder/utils/samples_stats.c +++ b/transcoder/utils/samples_stats.c @@ -24,6 +24,7 @@ void sample_stats_init(samples_stats_t* pStats,AVRational basetime) pStats->firstTimeStamp=0; pStats->lastTimeStamp=0; pStats->lastDts=0; + pStats->throttleWait=0; } void drain(samples_stats_t* pStats,uint64_t clock) @@ -102,12 +103,15 @@ void sample_stats_get_diagnostics(samples_stats_t *pStats,json_writer_ctx_t js) JSON_SERIALIZE_STRING("firstTimeStamp",pStats->firstTimeStamp>0 ? ts2str(pStats->firstTimeStamp,false): "N/A") JSON_SERIALIZE_STRING("lastTimeStamp",pStats->lastTimeStamp>0 ? ts2str(pStats->lastTimeStamp,false) : "N/A") JSON_SERIALIZE_INT64("lastDts",pStats->lastDts) + if(pStats->throttleWait > 0) { + JSON_SERIALIZE_INT64("throttle",pStats->throttleWait) + } } void samples_stats_log(const char* category,int level,samples_stats_t *stats,const char *prefix) { - LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf", + LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf throttleWait %s", prefix, stats->totalFrames, stats->totalErrors, @@ -116,5 +120,6 @@ void samples_stats_log(const char* category,int level,samples_stats_t *stats,con pts2str(stats->clockDrift), ((double)stats->currentBitRate)/(1000.0), stats->currentFrameRate, - stats->currentRate) + stats->currentRate, + pts2str(stats->throttleWait)) } diff --git a/transcoder/utils/samples_stats.h b/transcoder/utils/samples_stats.h index 0169bfaa..295c92ea 100644 --- a/transcoder/utils/samples_stats.h +++ b/transcoder/utils/samples_stats.h @@ -41,6 +41,7 @@ typedef struct uint64_t firstTimeStamp,lastTimeStamp; int64_t timeStampPassed; int64_t clockDrift; + int64_t throttleWait; } samples_stats_t; void sample_stats_init(samples_stats_t* pStats,AVRational basetime); diff --git a/transcoder/utils/throttler.c b/transcoder/utils/throttler.c new file mode 100644 index 00000000..6acac7dc --- /dev/null +++ b/transcoder/utils/throttler.c @@ -0,0 +1,86 @@ +// +// throttler.c +// live_transcoder +// +// + + +#include "../transcode/transcode_session.h" +#include "throttler.h" +#include "json_parser.h" + +static void doThrottle(float maxDataRate, + int coldSeconds, + int minThrottleWaitMs, + samples_stats_t *stats, + AVRational targetFramerate); + +int +throttler_init(samples_stats_t *stats,throttler_t *throttler) { + json_value_t *config = GetConfig(); + json_get_double(config,"throttler.maxDataRate",INFINITY,(double*)&throttler->maxDataRate); + *(bool*)&throttler->enabled = throttler->maxDataRate < INFINITY; + if(throttler->enabled){ + json_get_int(config,"throttler.coldSeconds",0,(int*)&throttler->coldSeconds); + json_get_int(config,"throttler.minThrottleWaitMs",1,(int*)&throttler->minThrottleWaitMs); + throttler->stats = stats; + } + return 0; +} + +void +throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) { + if(throttler && throttler->maxDataRate < INFINITY) { + const transcode_mediaInfo_t *mediaInfo = transcode_session ? transcode_session->currentMediaInfo : NULL; + if(mediaInfo && mediaInfo->codecParams){ + const bool isVideo = mediaInfo->codecParams->codec_type == AVMEDIA_TYPE_VIDEO; + const AVRational frameRate = isVideo ? mediaInfo->frameRate : + (AVRational){ .num = mediaInfo->codecParams->sample_rate , + .den = 1024 }; + + doThrottle(throttler->maxDataRate, + throttler->coldSeconds, + throttler->minThrottleWaitMs, + throttler->stats, + frameRate); + } + } +} + +static +void +doThrottle(float maxDataRate, + int coldSeconds, + int minThrottleWaitMs, + samples_stats_t *stats, + AVRational targetFramerate) +{ + + if(targetFramerate.den > 0 && targetFramerate.num > 0) { + float currentDataRate; + + samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"throttleThread-Stats"); + + if(stats->totalFrames < coldSeconds * targetFramerate.num / targetFramerate.den) { + return; + } + + currentDataRate = stats->currentFrameRate * targetFramerate.den / (float)targetFramerate.num; + + LOGGER(CATEGORY_THROTTLER, + AV_LOG_DEBUG,"throttleThread. data rate current: %.3f max: %.3f", + currentDataRate, + maxDataRate); + + if(currentDataRate > maxDataRate) { + // going to sleep for a period of time gained due to race + int throttleWaitUSec = (currentDataRate - maxDataRate) * 1000 * 1000; //av_rescale(1000 * 1000,targetFramerate.den,targetFramerate.num); + const int minThrottleWaitMs = 1; + if(throttleWaitUSec > minThrottleWaitMs * 1000) { + LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"throttleThread. throttling %.3f ms",throttleWaitUSec / 1000.f); + stats->throttleWait += av_rescale_q( throttleWaitUSec, clockScale, standard_timebase); + av_usleep(throttleWaitUSec); + } + } + } +} \ No newline at end of file diff --git a/transcoder/utils/throttler.h b/transcoder/utils/throttler.h new file mode 100644 index 00000000..c614ca92 --- /dev/null +++ b/transcoder/utils/throttler.h @@ -0,0 +1,20 @@ +// live_transcoder +// + +#ifndef Throttler_h +#define Throttler_h + +typedef struct { + const bool enabled; + const double maxDataRate; + const int coldSeconds; + const int minThrottleWaitMs; + samples_stats_t *stats; +} throttler_t; + +int throttler_init(samples_stats_t *stats,throttler_t *throttler); + +void +throttler_process(throttler_t *throttler,transcode_session_t *session); + +#endif \ No newline at end of file From d752bbe617ede2b1a1812ab7335207fff015a7d6 Mon Sep 17 00:00:00 2001 From: igorshevach Date: Mon, 6 Nov 2023 11:48:49 +0200 Subject: [PATCH 2/8] - file streamer loop fixes --- transcoder/config_v.json | 2 +- transcoder/debug/file_streamer.c | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/transcoder/config_v.json b/transcoder/config_v.json index aacf72f4..b0450bab 100644 --- a/transcoder/config_v.json +++ b/transcoder/config_v.json @@ -8,7 +8,7 @@ "jumpOffsetSec": -60 }, "throttler": { - "maxDataRate": 2.5, + "maxDataRate": 1.5, "coldSeconds": 0, "minThrottleWaitMs": 1 }, diff --git a/transcoder/debug/file_streamer.c b/transcoder/debug/file_streamer.c index ba816d8c..31223ae8 100644 --- a/transcoder/debug/file_streamer.c +++ b/transcoder/debug/file_streamer.c @@ -142,12 +142,14 @@ void* thread_stream_from_file(void *vargp) if (realTime && lastDts > 0) { - int64_t timePassed=av_rescale_q(packet.dts-lastDts,standard_timebase,AV_TIME_BASE_Q); - //LOGGER("SENDER",AV_LOG_DEBUG,"XXXX dt=%ld dd=%ld", (av_gettime_relative() - start_time),timePassed); - while ((av_gettime_relative() - start_time) < timePassed) { + int64_t timePassed=av_rescale_q(packet.dts,standard_timebase,AV_TIME_BASE_Q) + start_time, + clockPassed = av_gettime_relative(); + LOGGER("SENDER",AV_LOG_DEBUG,"XXXX clockPassed=%ld timePassed=%ld", clockPassed - start_time,timePassed - start_time); + while (clockPassed < timePassed) { LOGGER0("SENDER",AV_LOG_DEBUG,"XXXX Sleep 10ms"); av_usleep(10*1000);//10ms + clockPassed = av_gettime_relative(); } } @@ -179,7 +181,7 @@ void* thread_stream_from_file(void *vargp) LOGGER("SENDER",AV_LOG_DEBUG,"sent packet pts=%s dts=%s size=%d", ts2str(packet.pts,true), ts2str(packet.dts,true), - packet.dts,packet.size); + packet.size); av_packet_unref(&packet); From 572ece93bed4e3063dbbc8354a61dd90534f2e9c Mon Sep 17 00:00:00 2001 From: igorshevach Date: Mon, 6 Nov 2023 13:31:42 +0200 Subject: [PATCH 3/8] - add hiccup simulation to file streamer --- transcoder/config_v.json | 4 +++- transcoder/debug/file_streamer.c | 25 ++++++++++++++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/transcoder/config_v.json b/transcoder/config_v.json index b0450bab..bde615a1 100644 --- a/transcoder/config_v.json +++ b/transcoder/config_v.json @@ -5,7 +5,9 @@ "activeStream": 0, "xduration": 9000000, "randomDataPercentage": 0, - "jumpOffsetSec": -60 + "jumpOffsetSec": -60, + "hiccupIntervalSec": 0, + "hiccupDurationSec": 0 }, "throttler": { "maxDataRate": 1.5, diff --git a/transcoder/debug/file_streamer.c b/transcoder/debug/file_streamer.c index 31223ae8..1e0671af 100644 --- a/transcoder/debug/file_streamer.c +++ b/transcoder/debug/file_streamer.c @@ -49,9 +49,14 @@ void* thread_stream_from_file(void *vargp) char channelId[KMP_MAX_CHANNEL_ID]; json_get_string(GetConfig(),"input.channelId","1_abcdefgh",channelId,sizeof(channelId)); - int jumpOffsetSec=0; + int64_t jumpOffsetSec=0; json_get_int64(GetConfig(),"input.jumpoffsetsec",0,&jumpOffsetSec); + int64_t hiccupDurationSec, hiccupIntervalSec; + json_get_int64(GetConfig(),"input.hiccupDurationSec",0,&hiccupDurationSec); + + json_get_int64(GetConfig(),"input.hiccupIntervalSec",0,&hiccupIntervalSec); + AVPacket packet; av_init_packet(&packet); @@ -86,8 +91,10 @@ void* thread_stream_from_file(void *vargp) LOGGER("SENDER",AV_LOG_INFO,"Realtime = %s",realTime ? "true" : "false"); srand((int)time(NULL)); uint64_t lastDts=0; - int64_t start_time=av_gettime_relative(); - + int64_t start_time=av_gettime_relative(), + hiccup_duration = hiccupDurationSec * 1000 * 1000, + hiccup_interval = hiccupIntervalSec * 1000 * 1000, + next_hiccup = start_time + hiccup_interval; samples_stats_t stats; sample_stats_init(&stats,standard_timebase); @@ -144,6 +151,18 @@ void* thread_stream_from_file(void *vargp) int64_t timePassed=av_rescale_q(packet.dts,standard_timebase,AV_TIME_BASE_Q) + start_time, clockPassed = av_gettime_relative(); + + if(clockPassed >= next_hiccup && clockPassed < next_hiccup + hiccup_duration) { + next_hiccup += hiccup_duration; + LOGGER("SENDER",AV_LOG_INFO,"hiccup! [ %ld - %ld ]", + ts2str((clockPassed - start_time) * 90,true), + ts2str((next_hiccup - start_time) * 90,true)); + + av_usleep(next_hiccup - clockPassed); + + next_hiccup = av_gettime_relative() + hiccup_interval; + } + LOGGER("SENDER",AV_LOG_DEBUG,"XXXX clockPassed=%ld timePassed=%ld", clockPassed - start_time,timePassed - start_time); while (clockPassed < timePassed) { From c812e2845ce6ea3fdcde7b4aff73584d94269234 Mon Sep 17 00:00:00 2001 From: igorshevach Date: Mon, 6 Nov 2023 23:19:08 +0200 Subject: [PATCH 4/8] - reduce cold (heat up) period to 0 --- transcoder/receiver_server.c | 3 +- transcoder/tests/config_a.json | 147 +++++++++++++++++++++++++++ transcoder/{ => tests}/config_v.json | 2 +- transcoder/utils/throttler.c | 16 +-- transcoder/utils/throttler.h | 2 +- 5 files changed, 159 insertions(+), 11 deletions(-) create mode 100644 transcoder/tests/config_a.json rename transcoder/{ => tests}/config_v.json (99%) diff --git a/transcoder/receiver_server.c b/transcoder/receiver_server.c index d04f5365..2800f76b 100644 --- a/transcoder/receiver_server.c +++ b/transcoder/receiver_server.c @@ -28,9 +28,8 @@ int atomFileWrite (char* fileName,char* content,size_t size) int processedFrameCB(receiver_server_session_t *session,bool completed) { uint64_t now=av_gettime(); - receiver_server_t *server=session->server; if (completed || now-session->lastStatsUpdated>session->diagnosticsIntervalInSeconds) {//1 second interval - + receiver_server_t *server=session->server; char* tmpBuf=av_malloc(MAX_DIAGNOSTICS_STRING_LENGTH); diff --git a/transcoder/tests/config_a.json b/transcoder/tests/config_a.json new file mode 100644 index 00000000..39ea6c9f --- /dev/null +++ b/transcoder/tests/config_a.json @@ -0,0 +1,147 @@ +{ + "input": { + "file": "/media/worng_order_audio_only.ts", + "realTime": false, + "activeStream": 0, + "zduration": 9000000, + "randomDataPercentage": 0, + "xhiccupIntervalSec": 5, + "xhiccupDurationSec": 2 + }, + "throttler": { + "maxDataRate": 1.5, + "coldSeconds": 0, + "minThrottleWaitMs": 1 + }, + "frameDropper1": { + "enabled": false, + "queueDuration": 10, + "queueSize": 0, + "nonKeyFrameDropperThreshold": 4, + "decodedFrameDropperThreshold": 2 + }, + "logger": { + "logLevels": ["DEBUG","VERBOSE","INFO","WARN","ERROR","FATAL","PANIC"], + "logLevel": "DEBUG" + }, + "kmp": { + "listenPort": 16543, + "listenAddress": "0.0.0.0", + "acceptTimeout": 15, + "idleTimeout": 10, + "connectTimeout": 10 + }, + "control": { + "listenPort": 18001, + "listenAddress": "0.0.0.0" + }, + "debug": { + "diagnosticsIntervalInSeconds": 1 + }, + "output": { + "saveFile": true, + "outputFileNamePattern": "./output_%s.ts", + "streamingUrla": "kmp://localhost:6543", + "streamingUrl12": "kmp://192.168.11.59:6543", + "streamingUrl1": "" + }, + "engine": { + "encoders": { + "h264": ["h264_nvenc","libx264","h264_videotoolbox"] + }, + "presets": { + "A": { + "h264_videotoolbox": "default", + "libx264": "veryfast", + "h264_nvenc": "fast" + } + } + }, + "errorPolicy": { + "exitOnError": false + }, + "outputTracks": [ + { + "trackId": "v32", + "enabled": false, + "passthrough": true + }, + { + "trackId": "a32", + "enabled": true, + "passthrough": true + }, + { + "trackId": "a33", + "enabled": true, + "bitrate": 64, + "passthrough": false, + "codec": "aac", + "audioParams": { + "samplingRate": -1, + "channels": 2 + } + }, + { + "trackId": "v33", + "passthrough": false, + "enabled": false, + "bitrate": 900, + "codec": "h264", + "videoParams": { + "profile": "main", + "preset": "A", + "height": 480 + } + }, + { + "trackId": "v34", + "enabled": false, + "passthrough": false, + "bitrate": 600, + "codec": "h264", + "videoParams": { + "profile": "baseline", + "preset": "A", + "height": 360 + } + }, + { + "trackId": "v35", + "enabled": false, + "passthrough": false, + "bitrate": 400, + "codec": "h264", + "videoParams": { + "profile": "baseline", + "preset": "A", + "height": 360 + } + }, + { + "trackId": "v42", + "enabled": false, + "passthrough": false, + "bitrate": 1500, + "codec": "h264", + "videoParams": { + "profile": "high", + "preset": "A", + "height": 720 + } + }, + { + "trackId": "v43", + "enabled": false, + "passthrough": false, + "bitrate": 2500, + "codec": "h264", + "videoParams": { + "profile": "high", + "preset": "A", + "height": 2160, + "skipFrame": 1 + } + } + ] +} diff --git a/transcoder/config_v.json b/transcoder/tests/config_v.json similarity index 99% rename from transcoder/config_v.json rename to transcoder/tests/config_v.json index bde615a1..8796e1eb 100644 --- a/transcoder/config_v.json +++ b/transcoder/tests/config_v.json @@ -11,7 +11,7 @@ }, "throttler": { "maxDataRate": 1.5, - "coldSeconds": 0, + "coldSeconds": 1, "minThrottleWaitMs": 1 }, "frameDropper1": { diff --git a/transcoder/utils/throttler.c b/transcoder/utils/throttler.c index 6acac7dc..f08dc237 100644 --- a/transcoder/utils/throttler.c +++ b/transcoder/utils/throttler.c @@ -10,7 +10,7 @@ #include "json_parser.h" static void doThrottle(float maxDataRate, - int coldSeconds, + float coldSeconds, int minThrottleWaitMs, samples_stats_t *stats, AVRational targetFramerate); @@ -21,7 +21,7 @@ throttler_init(samples_stats_t *stats,throttler_t *throttler) { json_get_double(config,"throttler.maxDataRate",INFINITY,(double*)&throttler->maxDataRate); *(bool*)&throttler->enabled = throttler->maxDataRate < INFINITY; if(throttler->enabled){ - json_get_int(config,"throttler.coldSeconds",0,(int*)&throttler->coldSeconds); + json_get_double(config,"throttler.coldSeconds",0,(double*)&throttler->coldSeconds); json_get_int(config,"throttler.minThrottleWaitMs",1,(int*)&throttler->minThrottleWaitMs); throttler->stats = stats; } @@ -50,7 +50,7 @@ throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) static void doThrottle(float maxDataRate, - int coldSeconds, + float coldSeconds, int minThrottleWaitMs, samples_stats_t *stats, AVRational targetFramerate) @@ -61,11 +61,13 @@ doThrottle(float maxDataRate, samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"throttleThread-Stats"); - if(stats->totalFrames < coldSeconds * targetFramerate.num / targetFramerate.den) { + if(stats->totalFrames * targetFramerate.den < coldSeconds * targetFramerate.num ) { return; } - - currentDataRate = stats->currentFrameRate * targetFramerate.den / (float)targetFramerate.num; + // when starting up use actual number of frames received so far and + // not the speed at which they accumulate + const currentFrameRate = __MIN(stats->totalFrames,stats->currentFrameRate); + currentDataRate = currentFrameRate * targetFramerate.den / (float)targetFramerate.num; LOGGER(CATEGORY_THROTTLER, AV_LOG_DEBUG,"throttleThread. data rate current: %.3f max: %.3f", @@ -78,7 +80,7 @@ doThrottle(float maxDataRate, const int minThrottleWaitMs = 1; if(throttleWaitUSec > minThrottleWaitMs * 1000) { LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"throttleThread. throttling %.3f ms",throttleWaitUSec / 1000.f); - stats->throttleWait += av_rescale_q( throttleWaitUSec, clockScale, standard_timebase); + stats->throttleWait += av_rescale_q(throttleWaitUSec, clockScale, standard_timebase); av_usleep(throttleWaitUSec); } } diff --git a/transcoder/utils/throttler.h b/transcoder/utils/throttler.h index c614ca92..1508ace7 100644 --- a/transcoder/utils/throttler.h +++ b/transcoder/utils/throttler.h @@ -7,7 +7,7 @@ typedef struct { const bool enabled; const double maxDataRate; - const int coldSeconds; + const double coldSeconds; const int minThrottleWaitMs; samples_stats_t *stats; } throttler_t; From 8e96bcc0bac2c56891c8f07a4ccf16312a85a70e Mon Sep 17 00:00:00 2001 From: igorshevach Date: Tue, 7 Nov 2023 23:15:47 +0200 Subject: [PATCH 5/8] - improve start up performance --- transcoder/tests/config_a.json | 2 +- transcoder/tests/config_v.json | 2 +- transcoder/utils/throttler.c | 55 ++++++++++++++++++++-------------- transcoder/utils/throttler.h | 5 ++-- 4 files changed, 36 insertions(+), 28 deletions(-) diff --git a/transcoder/tests/config_a.json b/transcoder/tests/config_a.json index 39ea6c9f..8f2043dd 100644 --- a/transcoder/tests/config_a.json +++ b/transcoder/tests/config_a.json @@ -10,7 +10,7 @@ }, "throttler": { "maxDataRate": 1.5, - "coldSeconds": 0, + "useStatsDataRate": false, "minThrottleWaitMs": 1 }, "frameDropper1": { diff --git a/transcoder/tests/config_v.json b/transcoder/tests/config_v.json index 8796e1eb..789bffb1 100644 --- a/transcoder/tests/config_v.json +++ b/transcoder/tests/config_v.json @@ -11,7 +11,7 @@ }, "throttler": { "maxDataRate": 1.5, - "coldSeconds": 1, + "useStatsDataRate": false, "minThrottleWaitMs": 1 }, "frameDropper1": { diff --git a/transcoder/utils/throttler.c b/transcoder/utils/throttler.c index f08dc237..ca41f764 100644 --- a/transcoder/utils/throttler.c +++ b/transcoder/utils/throttler.c @@ -10,8 +10,8 @@ #include "json_parser.h" static void doThrottle(float maxDataRate, - float coldSeconds, - int minThrottleWaitMs, + bool useStatsDataRate, + double minThrottleWaitMs, samples_stats_t *stats, AVRational targetFramerate); @@ -19,10 +19,9 @@ int throttler_init(samples_stats_t *stats,throttler_t *throttler) { json_value_t *config = GetConfig(); json_get_double(config,"throttler.maxDataRate",INFINITY,(double*)&throttler->maxDataRate); - *(bool*)&throttler->enabled = throttler->maxDataRate < INFINITY; - if(throttler->enabled){ - json_get_double(config,"throttler.coldSeconds",0,(double*)&throttler->coldSeconds); - json_get_int(config,"throttler.minThrottleWaitMs",1,(int*)&throttler->minThrottleWaitMs); + if(throttler->maxDataRate < INFINITY){ + json_get_bool(config,"throttler.useStatsDataRate",false,(bool*)&throttler->useStatsDataRate); + json_get_double(config,"throttler.minThrottleWaitMs",1,(double*)&throttler->minThrottleWaitMs); throttler->stats = stats; } return 0; @@ -34,12 +33,14 @@ throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) const transcode_mediaInfo_t *mediaInfo = transcode_session ? transcode_session->currentMediaInfo : NULL; if(mediaInfo && mediaInfo->codecParams){ const bool isVideo = mediaInfo->codecParams->codec_type == AVMEDIA_TYPE_VIDEO; + // TODO: currently only AAC 'fps' is supported, MP3 and opus etc. + // may have a different frame allocation schemes const AVRational frameRate = isVideo ? mediaInfo->frameRate : (AVRational){ .num = mediaInfo->codecParams->sample_rate , .den = 1024 }; doThrottle(throttler->maxDataRate, - throttler->coldSeconds, + throttler->useStatsDataRate, throttler->minThrottleWaitMs, throttler->stats, frameRate); @@ -50,36 +51,44 @@ throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) static void doThrottle(float maxDataRate, - float coldSeconds, - int minThrottleWaitMs, + bool useStatsDataRate, + double minThrottleWaitMs, samples_stats_t *stats, AVRational targetFramerate) { if(targetFramerate.den > 0 && targetFramerate.num > 0) { - float currentDataRate; + const double currentDataRate = useStatsDataRate ? stats->currentRate : + stats->currentFrameRate * targetFramerate.den / (float)targetFramerate.num; - samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"throttleThread-Stats"); - - if(stats->totalFrames * targetFramerate.den < coldSeconds * targetFramerate.num ) { - return; - } - // when starting up use actual number of frames received so far and - // not the speed at which they accumulate - const currentFrameRate = __MIN(stats->totalFrames,stats->currentFrameRate); - currentDataRate = currentFrameRate * targetFramerate.den / (float)targetFramerate.num; + samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"Throttle-Stats"); LOGGER(CATEGORY_THROTTLER, - AV_LOG_DEBUG,"throttleThread. data rate current: %.3f max: %.3f", + AV_LOG_DEBUG,"%s. data rate current: %.3f max: %.3f", + __FUNCTION__, currentDataRate, maxDataRate); if(currentDataRate > maxDataRate) { // going to sleep for a period of time gained due to race - int throttleWaitUSec = (currentDataRate - maxDataRate) * 1000 * 1000; //av_rescale(1000 * 1000,targetFramerate.den,targetFramerate.num); - const int minThrottleWaitMs = 1; + int throttleWindowUs; + if(stats->totalFrames * targetFramerate.den < targetFramerate.num) { + // during startup frame rate is not stable and usually is high + // due to system delays related to various factors. therefore. + // we must work with high pressures in small intervals of time. + // In order to not overshoot we take smaller intervals proportional to + // time passed since beginning. + throttleWindowUs = av_rescale_q(1000*1000, + (AVRational){stats->totalFrames,1}, + targetFramerate); + } else { + throttleWindowUs = 1000 * 1000; + } + int throttleWaitUSec = (currentDataRate - maxDataRate) * throttleWindowUs; if(throttleWaitUSec > minThrottleWaitMs * 1000) { - LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"throttleThread. throttling %.3f ms",throttleWaitUSec / 1000.f); + LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"%s. throttling %.3f ms", + __FUNCTION__, + throttleWaitUSec / 1000.f); stats->throttleWait += av_rescale_q(throttleWaitUSec, clockScale, standard_timebase); av_usleep(throttleWaitUSec); } diff --git a/transcoder/utils/throttler.h b/transcoder/utils/throttler.h index 1508ace7..43eae41f 100644 --- a/transcoder/utils/throttler.h +++ b/transcoder/utils/throttler.h @@ -5,10 +5,9 @@ #define Throttler_h typedef struct { - const bool enabled; + const bool useStatsDataRate; const double maxDataRate; - const double coldSeconds; - const int minThrottleWaitMs; + const double minThrottleWaitMs; samples_stats_t *stats; } throttler_t; From 820a60500e1ddac14ce7c6344f49fa480fcadc2b Mon Sep 17 00:00:00 2001 From: igorshevach Date: Wed, 8 Nov 2023 00:44:12 +0200 Subject: [PATCH 6/8] - implement fallback on dts based rate estimation in case codec is not supported (so far != aac) --- transcoder/utils/throttler.c | 123 ++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 45 deletions(-) diff --git a/transcoder/utils/throttler.c b/transcoder/utils/throttler.c index ca41f764..fbffd868 100644 --- a/transcoder/utils/throttler.c +++ b/transcoder/utils/throttler.c @@ -9,12 +9,18 @@ #include "throttler.h" #include "json_parser.h" +// forward declarations static void doThrottle(float maxDataRate, - bool useStatsDataRate, double minThrottleWaitMs, samples_stats_t *stats, AVRational targetFramerate); +static +bool getFrameRateFromMediaInfo( + const transcode_mediaInfo_t *mediaInfo, + AVRational *result); + +// api int throttler_init(samples_stats_t *stats,throttler_t *throttler) { json_value_t *config = GetConfig(); @@ -32,15 +38,13 @@ throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) if(throttler && throttler->maxDataRate < INFINITY) { const transcode_mediaInfo_t *mediaInfo = transcode_session ? transcode_session->currentMediaInfo : NULL; if(mediaInfo && mediaInfo->codecParams){ - const bool isVideo = mediaInfo->codecParams->codec_type == AVMEDIA_TYPE_VIDEO; - // TODO: currently only AAC 'fps' is supported, MP3 and opus etc. - // may have a different frame allocation schemes - const AVRational frameRate = isVideo ? mediaInfo->frameRate : - (AVRational){ .num = mediaInfo->codecParams->sample_rate , - .den = 1024 }; + AVRational frameRate = {0}; + + if(!throttler->useStatsDataRate) { + getFrameRateFromMediaInfo(mediaInfo,&frameRate); + } doThrottle(throttler->maxDataRate, - throttler->useStatsDataRate, throttler->minThrottleWaitMs, throttler->stats, frameRate); @@ -48,50 +52,79 @@ throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) } } +// implementation: +static +bool +getFrameRateFromMediaInfo( + const transcode_mediaInfo_t *mediaInfo, + AVRational *result) { + if(AVMEDIA_TYPE_VIDEO == mediaInfo->codecParams->codec_type){ + *result = mediaInfo->frameRate; + return true; + } + else if(AV_CODEC_ID_AAC == mediaInfo->codecParams->codec_id){ + *result = (AVRational){ + .num = mediaInfo->codecParams->sample_rate , + .den = 960 // 1024 + }; + return true; + } + + LOGGER(CATEGORY_THROTTLER, + AV_LOG_DEBUG,"%s. unsupported (av) codec %d", + __FUNCTION__, + mediaInfo->codecParams->codec_id); + + return false; +} + +static +int64_t calculateThrottleWindow(samples_stats_t *stats,AVRational targetFramerate){ + if(targetFramerate.den == 0) { + if(stats->dtsPassed < 90000) { + return av_rescale(1000*1000,stats->dtsPassed , 90000); + } + } else if(stats->totalFrames * targetFramerate.den < targetFramerate.num) { + // during startup frame rate is not stable and usually is high + // due to system delays related to various factors. therefore. + // we must work with high pressures in small intervals of time. + // In order to not overshoot we take smaller intervals proportional to + // time passed since beginning. + return av_rescale_q(1000*1000, + (AVRational){stats->totalFrames,1}, + targetFramerate); + } + return 1000 * 1000; +} + static void doThrottle(float maxDataRate, - bool useStatsDataRate, double minThrottleWaitMs, samples_stats_t *stats, AVRational targetFramerate) { + const double currentDataRate = targetFramerate.den == 0 ? stats->currentRate : + stats->currentFrameRate * targetFramerate.den / (float)targetFramerate.num; - if(targetFramerate.den > 0 && targetFramerate.num > 0) { - const double currentDataRate = useStatsDataRate ? stats->currentRate : - stats->currentFrameRate * targetFramerate.den / (float)targetFramerate.num; - - samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"Throttle-Stats"); - - LOGGER(CATEGORY_THROTTLER, - AV_LOG_DEBUG,"%s. data rate current: %.3f max: %.3f", - __FUNCTION__, - currentDataRate, - maxDataRate); - - if(currentDataRate > maxDataRate) { - // going to sleep for a period of time gained due to race - int throttleWindowUs; - if(stats->totalFrames * targetFramerate.den < targetFramerate.num) { - // during startup frame rate is not stable and usually is high - // due to system delays related to various factors. therefore. - // we must work with high pressures in small intervals of time. - // In order to not overshoot we take smaller intervals proportional to - // time passed since beginning. - throttleWindowUs = av_rescale_q(1000*1000, - (AVRational){stats->totalFrames,1}, - targetFramerate); - } else { - throttleWindowUs = 1000 * 1000; - } - int throttleWaitUSec = (currentDataRate - maxDataRate) * throttleWindowUs; - if(throttleWaitUSec > minThrottleWaitMs * 1000) { - LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"%s. throttling %.3f ms", - __FUNCTION__, - throttleWaitUSec / 1000.f); - stats->throttleWait += av_rescale_q(throttleWaitUSec, clockScale, standard_timebase); - av_usleep(throttleWaitUSec); - } + samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"Throttle-Stats"); + + LOGGER(CATEGORY_THROTTLER, + AV_LOG_DEBUG,"%s. data rate current: %.3f max: %.3f", + __FUNCTION__, + currentDataRate, + maxDataRate); + + if(currentDataRate > maxDataRate) { + // going to sleep for a period of time gained due to race + int64_t throttleWindowUs = calculateThrottleWindow(stats,targetFramerate); + int throttleWaitUSec = (currentDataRate - maxDataRate) * throttleWindowUs; + if(throttleWaitUSec > minThrottleWaitMs * 1000) { + LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"%s. throttling %.3f ms", + __FUNCTION__, + throttleWaitUSec / 1000.f); + stats->throttleWait += av_rescale_q(throttleWaitUSec, clockScale, standard_timebase); + av_usleep(throttleWaitUSec); } - } + } } \ No newline at end of file From 4762d7c98b6b5d43b27116ba8b9cd0634312d86c Mon Sep 17 00:00:00 2001 From: igorshevach Date: Wed, 8 Nov 2023 00:48:50 +0200 Subject: [PATCH 7/8] - typo --- transcoder/utils/throttler.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/transcoder/utils/throttler.c b/transcoder/utils/throttler.c index fbffd868..d63a6f3c 100644 --- a/transcoder/utils/throttler.c +++ b/transcoder/utils/throttler.c @@ -80,16 +80,16 @@ getFrameRateFromMediaInfo( static int64_t calculateThrottleWindow(samples_stats_t *stats,AVRational targetFramerate){ + // during startup frame rate is not stable and usually is high + // due to system delays related to various factors. therefore. + // we must work with high pressures in small intervals of time. + // In order to not overshoot we take smaller intervals proportional to + // time passed since beginning. if(targetFramerate.den == 0) { if(stats->dtsPassed < 90000) { return av_rescale(1000*1000,stats->dtsPassed , 90000); } } else if(stats->totalFrames * targetFramerate.den < targetFramerate.num) { - // during startup frame rate is not stable and usually is high - // due to system delays related to various factors. therefore. - // we must work with high pressures in small intervals of time. - // In order to not overshoot we take smaller intervals proportional to - // time passed since beginning. return av_rescale_q(1000*1000, (AVRational){stats->totalFrames,1}, targetFramerate); From 7acde1fac7aba0f0d33a919545368fdd3c9ba179 Mon Sep 17 00:00:00 2001 From: igorshevach Date: Sun, 12 Nov 2023 19:32:37 +0200 Subject: [PATCH 8/8] - limit throttle time by min allowed frame duration = 1sec / (max data rate * targetFrameRate) --- transcoder/utils/throttler.c | 1 + 1 file changed, 1 insertion(+) diff --git a/transcoder/utils/throttler.c b/transcoder/utils/throttler.c index d63a6f3c..07f9ec78 100644 --- a/transcoder/utils/throttler.c +++ b/transcoder/utils/throttler.c @@ -119,6 +119,7 @@ doThrottle(float maxDataRate, // going to sleep for a period of time gained due to race int64_t throttleWindowUs = calculateThrottleWindow(stats,targetFramerate); int throttleWaitUSec = (currentDataRate - maxDataRate) * throttleWindowUs; + throttleWaitUSec = __MIN(throttleWaitUSec, av_rescale(1000*1000,targetFramerate.den,targetFramerate.num) / maxDataRate); if(throttleWaitUSec > minThrottleWaitMs * 1000) { LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"%s. throttling %.3f ms", __FUNCTION__,