diff --git a/transcoder/common/json_parser.c b/transcoder/common/json_parser.c index f8957af5..9e1c9536 100644 --- a/transcoder/common/json_parser.c +++ b/transcoder/common/json_parser.c @@ -1000,6 +1000,6 @@ json_status_t json_get_double(const json_value_t* obj,char* path,double defaultV if (jresult->type!=JSON_FRAC) { return JSON_BAD_DATA; } - *result = ((double)jresult->v.num.denom) / ((double)jresult->v.num.num); + *result = ((double)jresult->v.num.num) / ((double)jresult->v.num.denom); return JSON_OK; } diff --git a/transcoder/debug/file_streamer.c b/transcoder/debug/file_streamer.c index 914f0657..f9ec8232 100644 --- a/transcoder/debug/file_streamer.c +++ b/transcoder/debug/file_streamer.c @@ -49,9 +49,14 @@ void* thread_stream_from_file(void *vargp) char channelId[KMP_MAX_CHANNEL_ID]; json_get_string(GetConfig(),"input.channelId","1_abcdefgh",channelId,sizeof(channelId)); - int jumpOffsetSec=0; + int64_t jumpOffsetSec=0; json_get_int64(GetConfig(),"input.jumpoffsetsec",0,&jumpOffsetSec); + int64_t hiccupDurationSec, hiccupIntervalSec; + json_get_int64(GetConfig(),"input.hiccupDurationSec",0,&hiccupDurationSec); + + json_get_int64(GetConfig(),"input.hiccupIntervalSec",0,&hiccupIntervalSec); + AVPacket packet; av_init_packet(&packet); @@ -86,8 +91,10 @@ void* thread_stream_from_file(void *vargp) LOGGER("SENDER",AV_LOG_INFO,"Realtime = %s",realTime ? "true" : "false"); srand((int)time(NULL)); uint64_t lastDts=0; - int64_t start_time=av_gettime_relative(); - + int64_t start_time=av_gettime_relative(), + hiccup_duration = hiccupDurationSec * 1000 * 1000, + hiccup_interval = hiccupIntervalSec * 1000 * 1000, + next_hiccup = start_time + hiccup_interval; samples_stats_t stats; sample_stats_init(&stats,standard_timebase); @@ -143,12 +150,26 @@ void* thread_stream_from_file(void *vargp) if (realTime && lastDts > 0) { - int64_t timePassed=av_rescale_q(packet.dts-lastDts,standard_timebase,AV_TIME_BASE_Q); - //LOGGER("SENDER",AV_LOG_DEBUG,"XXXX dt=%ld dd=%ld", (av_gettime_relative() - start_time),timePassed); - while ((av_gettime_relative() - start_time) < timePassed) { + int64_t timePassed=av_rescale_q(packet.dts,standard_timebase,AV_TIME_BASE_Q) + start_time, + clockPassed = av_gettime_relative(); + + if(clockPassed >= next_hiccup && clockPassed < next_hiccup + hiccup_duration) { + next_hiccup += hiccup_duration; + LOGGER("SENDER",AV_LOG_INFO,"hiccup! [ %ld - %ld ]", + ts2str((clockPassed - start_time) * 90,true), + ts2str((next_hiccup - start_time) * 90,true)); + + av_usleep(next_hiccup - clockPassed); + + next_hiccup = av_gettime_relative() + hiccup_interval; + } + + LOGGER("SENDER",AV_LOG_DEBUG,"XXXX clockPassed=%ld timePassed=%ld", clockPassed - start_time,timePassed - start_time); + while (clockPassed < timePassed) { LOGGER0("SENDER",AV_LOG_DEBUG,"XXXX Sleep 10ms"); av_usleep(10*1000);//10ms + clockPassed = av_gettime_relative(); } } @@ -180,7 +201,7 @@ void* thread_stream_from_file(void *vargp) LOGGER("SENDER",AV_LOG_DEBUG,"sent packet pts=%s dts=%s size=%d", ts2str(packet.pts,true), ts2str(packet.dts,true), - packet.dts,packet.size); + packet.size); av_packet_unref(&packet); diff --git a/transcoder/receiver_server.c b/transcoder/receiver_server.c index 5b5529ae..2800f76b 100644 --- a/transcoder/receiver_server.c +++ b/transcoder/receiver_server.c @@ -9,6 +9,7 @@ #include "receiver_server.h" #include "KMP/KMP.h" #include "transcode_session.h" +#include "utils/throttler.h" int atomFileWrite (char* fileName,char* content,size_t size) @@ -67,9 +68,11 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran bool autoAckMode; uint64_t received_frame_id=0; kmp_frame_position_t current_position; + throttler_t throttler = {0}; json_get_bool(GetConfig(),"autoAckModeEnabled",false,&autoAckMode); + _S(throttler_init(&server->receiverStats,&throttler)); while (retVal >= 0 && session->kmpClient.socket) { @@ -119,6 +122,9 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran pthread_mutex_lock(&server->diagnostics_locker); // lock the critical section samples_stats_add(&server->receiverStats,packet->dts,packet->pos,packet->size); pthread_mutex_unlock(&server->diagnostics_locker); // lock the critical section + + throttler_process(&throttler,transcode_session); + if(add_packet_frame_id_and_pts(packet,received_frame_id,packet->pts)){ LOGGER(CATEGORY_RECEIVER,AV_LOG_ERROR,"[%s] failed to set frame id %lld on packet",session->stream_name,received_frame_id); } diff --git a/transcoder/tests/config_a.json b/transcoder/tests/config_a.json new file mode 100644 index 00000000..8f2043dd --- /dev/null +++ b/transcoder/tests/config_a.json @@ -0,0 +1,147 @@ +{ + "input": { + "file": "/media/worng_order_audio_only.ts", + "realTime": false, + "activeStream": 0, + "zduration": 9000000, + "randomDataPercentage": 0, + "xhiccupIntervalSec": 5, + "xhiccupDurationSec": 2 + }, + "throttler": { + "maxDataRate": 1.5, + "useStatsDataRate": false, + "minThrottleWaitMs": 1 + }, + "frameDropper1": { + "enabled": false, + "queueDuration": 10, + "queueSize": 0, + "nonKeyFrameDropperThreshold": 4, + "decodedFrameDropperThreshold": 2 + }, + "logger": { + "logLevels": ["DEBUG","VERBOSE","INFO","WARN","ERROR","FATAL","PANIC"], + "logLevel": "DEBUG" + }, + "kmp": { + "listenPort": 16543, + "listenAddress": "0.0.0.0", + "acceptTimeout": 15, + "idleTimeout": 10, + "connectTimeout": 10 + }, + "control": { + "listenPort": 18001, + "listenAddress": "0.0.0.0" + }, + "debug": { + "diagnosticsIntervalInSeconds": 1 + }, + "output": { + "saveFile": true, + "outputFileNamePattern": "./output_%s.ts", + "streamingUrla": "kmp://localhost:6543", + "streamingUrl12": "kmp://192.168.11.59:6543", + "streamingUrl1": "" + }, + "engine": { + "encoders": { + "h264": ["h264_nvenc","libx264","h264_videotoolbox"] + }, + "presets": { + "A": { + "h264_videotoolbox": "default", + "libx264": "veryfast", + "h264_nvenc": "fast" + } + } + }, + "errorPolicy": { + "exitOnError": false + }, + "outputTracks": [ + { + "trackId": "v32", + "enabled": false, + "passthrough": true + }, + { + "trackId": "a32", + "enabled": true, + "passthrough": true + }, + { + "trackId": "a33", + "enabled": true, + "bitrate": 64, + "passthrough": false, + "codec": "aac", + "audioParams": { + "samplingRate": -1, + "channels": 2 + } + }, + { + "trackId": "v33", + "passthrough": false, + "enabled": false, + "bitrate": 900, + "codec": "h264", + "videoParams": { + "profile": "main", + "preset": "A", + "height": 480 + } + }, + { + "trackId": "v34", + "enabled": false, + "passthrough": false, + "bitrate": 600, + "codec": "h264", + "videoParams": { + "profile": "baseline", + "preset": "A", + "height": 360 + } + }, + { + "trackId": "v35", + "enabled": false, + "passthrough": false, + "bitrate": 400, + "codec": "h264", + "videoParams": { + "profile": "baseline", + "preset": "A", + "height": 360 + } + }, + { + "trackId": "v42", + "enabled": false, + "passthrough": false, + "bitrate": 1500, + "codec": "h264", + "videoParams": { + "profile": "high", + "preset": "A", + "height": 720 + } + }, + { + "trackId": "v43", + "enabled": false, + "passthrough": false, + "bitrate": 2500, + "codec": "h264", + "videoParams": { + "profile": "high", + "preset": "A", + "height": 2160, + "skipFrame": 1 + } + } + ] +} diff --git a/transcoder/config_v.json b/transcoder/tests/config_v.json similarity index 93% rename from transcoder/config_v.json rename to transcoder/tests/config_v.json index 229b36e7..789bffb1 100644 --- a/transcoder/config_v.json +++ b/transcoder/tests/config_v.json @@ -1,11 +1,18 @@ { "input": { "file": "/media/worng_order_video_only.mp4", - "realTime": true, + "realTime": false, "activeStream": 0, "xduration": 9000000, "randomDataPercentage": 0, - "jumpOffsetSec": -60 + "jumpOffsetSec": -60, + "hiccupIntervalSec": 0, + "hiccupDurationSec": 0 + }, + "throttler": { + "maxDataRate": 1.5, + "useStatsDataRate": false, + "minThrottleWaitMs": 1 }, "frameDropper1": { "enabled": false, diff --git a/transcoder/utils/logger.h b/transcoder/utils/logger.h index dfd5f9fa..8ac370c8 100644 --- a/transcoder/utils/logger.h +++ b/transcoder/utils/logger.h @@ -20,6 +20,7 @@ #define CATEGORY_RECEIVER "RECEIVER" #define CATEGORY_KMP "KMP" #define CATEGORY_HTTP_SERVER "HTTPSERVER" +#define CATEGORY_THROTTLER "THROTTLER" void logger1(const char* category,int level,const char *fmt, ...); void loggerFlush(); diff --git a/transcoder/utils/samples_stats.c b/transcoder/utils/samples_stats.c index c99459fe..59fdfaea 100644 --- a/transcoder/utils/samples_stats.c +++ b/transcoder/utils/samples_stats.c @@ -24,6 +24,7 @@ void sample_stats_init(samples_stats_t* pStats,AVRational basetime) pStats->firstTimeStamp=0; pStats->lastTimeStamp=0; pStats->lastDts=0; + pStats->throttleWait=0; } void drain(samples_stats_t* pStats,uint64_t clock) @@ -102,12 +103,15 @@ void sample_stats_get_diagnostics(samples_stats_t *pStats,json_writer_ctx_t js) JSON_SERIALIZE_STRING("firstTimeStamp",pStats->firstTimeStamp>0 ? ts2str(pStats->firstTimeStamp,false): "N/A") JSON_SERIALIZE_STRING("lastTimeStamp",pStats->lastTimeStamp>0 ? ts2str(pStats->lastTimeStamp,false) : "N/A") JSON_SERIALIZE_INT64("lastDts",pStats->lastDts) + if(pStats->throttleWait > 0) { + JSON_SERIALIZE_INT64("throttle",pStats->throttleWait) + } } void samples_stats_log(const char* category,int level,samples_stats_t *stats,const char *prefix) { - LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf", + LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf throttleWait %s", prefix, stats->totalFrames, stats->totalErrors, @@ -116,5 +120,6 @@ void samples_stats_log(const char* category,int level,samples_stats_t *stats,con pts2str(stats->clockDrift), ((double)stats->currentBitRate)/(1000.0), stats->currentFrameRate, - stats->currentRate) + stats->currentRate, + pts2str(stats->throttleWait)) } diff --git a/transcoder/utils/samples_stats.h b/transcoder/utils/samples_stats.h index 0169bfaa..295c92ea 100644 --- a/transcoder/utils/samples_stats.h +++ b/transcoder/utils/samples_stats.h @@ -41,6 +41,7 @@ typedef struct uint64_t firstTimeStamp,lastTimeStamp; int64_t timeStampPassed; int64_t clockDrift; + int64_t throttleWait; } samples_stats_t; void sample_stats_init(samples_stats_t* pStats,AVRational basetime); diff --git a/transcoder/utils/throttler.c b/transcoder/utils/throttler.c new file mode 100644 index 00000000..07f9ec78 --- /dev/null +++ b/transcoder/utils/throttler.c @@ -0,0 +1,131 @@ +// +// throttler.c +// live_transcoder +// +// + + +#include "../transcode/transcode_session.h" +#include "throttler.h" +#include "json_parser.h" + +// forward declarations +static void doThrottle(float maxDataRate, + double minThrottleWaitMs, + samples_stats_t *stats, + AVRational targetFramerate); + +static +bool getFrameRateFromMediaInfo( + const transcode_mediaInfo_t *mediaInfo, + AVRational *result); + +// api +int +throttler_init(samples_stats_t *stats,throttler_t *throttler) { + json_value_t *config = GetConfig(); + json_get_double(config,"throttler.maxDataRate",INFINITY,(double*)&throttler->maxDataRate); + if(throttler->maxDataRate < INFINITY){ + json_get_bool(config,"throttler.useStatsDataRate",false,(bool*)&throttler->useStatsDataRate); + json_get_double(config,"throttler.minThrottleWaitMs",1,(double*)&throttler->minThrottleWaitMs); + throttler->stats = stats; + } + return 0; +} + +void +throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) { + if(throttler && throttler->maxDataRate < INFINITY) { + const transcode_mediaInfo_t *mediaInfo = transcode_session ? transcode_session->currentMediaInfo : NULL; + if(mediaInfo && mediaInfo->codecParams){ + AVRational frameRate = {0}; + + if(!throttler->useStatsDataRate) { + getFrameRateFromMediaInfo(mediaInfo,&frameRate); + } + + doThrottle(throttler->maxDataRate, + throttler->minThrottleWaitMs, + throttler->stats, + frameRate); + } + } +} + +// implementation: +static +bool +getFrameRateFromMediaInfo( + const transcode_mediaInfo_t *mediaInfo, + AVRational *result) { + if(AVMEDIA_TYPE_VIDEO == mediaInfo->codecParams->codec_type){ + *result = mediaInfo->frameRate; + return true; + } + else if(AV_CODEC_ID_AAC == mediaInfo->codecParams->codec_id){ + *result = (AVRational){ + .num = mediaInfo->codecParams->sample_rate , + .den = 960 // 1024 + }; + return true; + } + + LOGGER(CATEGORY_THROTTLER, + AV_LOG_DEBUG,"%s. unsupported (av) codec %d", + __FUNCTION__, + mediaInfo->codecParams->codec_id); + + return false; +} + +static +int64_t calculateThrottleWindow(samples_stats_t *stats,AVRational targetFramerate){ + // during startup frame rate is not stable and usually is high + // due to system delays related to various factors. therefore. + // we must work with high pressures in small intervals of time. + // In order to not overshoot we take smaller intervals proportional to + // time passed since beginning. + if(targetFramerate.den == 0) { + if(stats->dtsPassed < 90000) { + return av_rescale(1000*1000,stats->dtsPassed , 90000); + } + } else if(stats->totalFrames * targetFramerate.den < targetFramerate.num) { + return av_rescale_q(1000*1000, + (AVRational){stats->totalFrames,1}, + targetFramerate); + } + return 1000 * 1000; +} + +static +void +doThrottle(float maxDataRate, + double minThrottleWaitMs, + samples_stats_t *stats, + AVRational targetFramerate) +{ + const double currentDataRate = targetFramerate.den == 0 ? stats->currentRate : + stats->currentFrameRate * targetFramerate.den / (float)targetFramerate.num; + + samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"Throttle-Stats"); + + LOGGER(CATEGORY_THROTTLER, + AV_LOG_DEBUG,"%s. data rate current: %.3f max: %.3f", + __FUNCTION__, + currentDataRate, + maxDataRate); + + if(currentDataRate > maxDataRate) { + // going to sleep for a period of time gained due to race + int64_t throttleWindowUs = calculateThrottleWindow(stats,targetFramerate); + int throttleWaitUSec = (currentDataRate - maxDataRate) * throttleWindowUs; + throttleWaitUSec = __MIN(throttleWaitUSec, av_rescale(1000*1000,targetFramerate.den,targetFramerate.num) / maxDataRate); + if(throttleWaitUSec > minThrottleWaitMs * 1000) { + LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"%s. throttling %.3f ms", + __FUNCTION__, + throttleWaitUSec / 1000.f); + stats->throttleWait += av_rescale_q(throttleWaitUSec, clockScale, standard_timebase); + av_usleep(throttleWaitUSec); + } + } +} \ No newline at end of file diff --git a/transcoder/utils/throttler.h b/transcoder/utils/throttler.h new file mode 100644 index 00000000..43eae41f --- /dev/null +++ b/transcoder/utils/throttler.h @@ -0,0 +1,19 @@ +// live_transcoder +// + +#ifndef Throttler_h +#define Throttler_h + +typedef struct { + const bool useStatsDataRate; + const double maxDataRate; + const double minThrottleWaitMs; + samples_stats_t *stats; +} throttler_t; + +int throttler_init(samples_stats_t *stats,throttler_t *throttler); + +void +throttler_process(throttler_t *throttler,transcode_session_t *session); + +#endif \ No newline at end of file