Skip to content

Commit

Permalink
Merge pull request #181 from kaltura/master-throttle-transcoder
Browse files Browse the repository at this point in the history
LIV-1205: Transcoder overload protection.
  • Loading branch information
david-winder-kaltura authored Nov 13, 2023
2 parents 94f6723 + 7acde1f commit b574371
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 12 deletions.
2 changes: 1 addition & 1 deletion transcoder/common/json_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,6 @@ json_status_t json_get_double(const json_value_t* obj,char* path,double defaultV
if (jresult->type!=JSON_FRAC) {
return JSON_BAD_DATA;
}
*result = ((double)jresult->v.num.denom) / ((double)jresult->v.num.num);
*result = ((double)jresult->v.num.num) / ((double)jresult->v.num.denom);
return JSON_OK;
}
35 changes: 28 additions & 7 deletions transcoder/debug/file_streamer.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ void* thread_stream_from_file(void *vargp)
char channelId[KMP_MAX_CHANNEL_ID];
json_get_string(GetConfig(),"input.channelId","1_abcdefgh",channelId,sizeof(channelId));

int jumpOffsetSec=0;
int64_t jumpOffsetSec=0;
json_get_int64(GetConfig(),"input.jumpoffsetsec",0,&jumpOffsetSec);

int64_t hiccupDurationSec, hiccupIntervalSec;
json_get_int64(GetConfig(),"input.hiccupDurationSec",0,&hiccupDurationSec);

json_get_int64(GetConfig(),"input.hiccupIntervalSec",0,&hiccupIntervalSec);

AVPacket packet;
av_init_packet(&packet);

Expand Down Expand Up @@ -86,8 +91,10 @@ void* thread_stream_from_file(void *vargp)
LOGGER("SENDER",AV_LOG_INFO,"Realtime = %s",realTime ? "true" : "false");
srand((int)time(NULL));
uint64_t lastDts=0;
int64_t start_time=av_gettime_relative();

int64_t start_time=av_gettime_relative(),
hiccup_duration = hiccupDurationSec * 1000 * 1000,
hiccup_interval = hiccupIntervalSec * 1000 * 1000,
next_hiccup = start_time + hiccup_interval;

samples_stats_t stats;
sample_stats_init(&stats,standard_timebase);
Expand Down Expand Up @@ -143,12 +150,26 @@ void* thread_stream_from_file(void *vargp)

if (realTime && lastDts > 0) {

int64_t timePassed=av_rescale_q(packet.dts-lastDts,standard_timebase,AV_TIME_BASE_Q);
//LOGGER("SENDER",AV_LOG_DEBUG,"XXXX dt=%ld dd=%ld", (av_gettime_relative() - start_time),timePassed);
while ((av_gettime_relative() - start_time) < timePassed) {
int64_t timePassed=av_rescale_q(packet.dts,standard_timebase,AV_TIME_BASE_Q) + start_time,
clockPassed = av_gettime_relative();

if(clockPassed >= next_hiccup && clockPassed < next_hiccup + hiccup_duration) {
next_hiccup += hiccup_duration;
LOGGER("SENDER",AV_LOG_INFO,"hiccup! [ %ld - %ld ]",
ts2str((clockPassed - start_time) * 90,true),
ts2str((next_hiccup - start_time) * 90,true));

av_usleep(next_hiccup - clockPassed);

next_hiccup = av_gettime_relative() + hiccup_interval;
}

LOGGER("SENDER",AV_LOG_DEBUG,"XXXX clockPassed=%ld timePassed=%ld", clockPassed - start_time,timePassed - start_time);
while (clockPassed < timePassed) {

LOGGER0("SENDER",AV_LOG_DEBUG,"XXXX Sleep 10ms");
av_usleep(10*1000);//10ms
clockPassed = av_gettime_relative();
}
}

Expand Down Expand Up @@ -180,7 +201,7 @@ void* thread_stream_from_file(void *vargp)
LOGGER("SENDER",AV_LOG_DEBUG,"sent packet pts=%s dts=%s size=%d",
ts2str(packet.pts,true),
ts2str(packet.dts,true),
packet.dts,packet.size);
packet.size);


av_packet_unref(&packet);
Expand Down
6 changes: 6 additions & 0 deletions transcoder/receiver_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "receiver_server.h"
#include "KMP/KMP.h"
#include "transcode_session.h"
#include "utils/throttler.h"


int atomFileWrite (char* fileName,char* content,size_t size)
Expand Down Expand Up @@ -67,9 +68,11 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran
bool autoAckMode;
uint64_t received_frame_id=0;
kmp_frame_position_t current_position;
throttler_t throttler = {0};

json_get_bool(GetConfig(),"autoAckModeEnabled",false,&autoAckMode);

_S(throttler_init(&server->receiverStats,&throttler));

while (retVal >= 0 && session->kmpClient.socket) {

Expand Down Expand Up @@ -119,6 +122,9 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran
pthread_mutex_lock(&server->diagnostics_locker); // lock the critical section
samples_stats_add(&server->receiverStats,packet->dts,packet->pos,packet->size);
pthread_mutex_unlock(&server->diagnostics_locker); // lock the critical section

throttler_process(&throttler,transcode_session);

if(add_packet_frame_id_and_pts(packet,received_frame_id,packet->pts)){
LOGGER(CATEGORY_RECEIVER,AV_LOG_ERROR,"[%s] failed to set frame id %lld on packet",session->stream_name,received_frame_id);
}
Expand Down
147 changes: 147 additions & 0 deletions transcoder/tests/config_a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
{
"input": {
"file": "/media/worng_order_audio_only.ts",
"realTime": false,
"activeStream": 0,
"zduration": 9000000,
"randomDataPercentage": 0,
"xhiccupIntervalSec": 5,
"xhiccupDurationSec": 2
},
"throttler": {
"maxDataRate": 1.5,
"useStatsDataRate": false,
"minThrottleWaitMs": 1
},
"frameDropper1": {
"enabled": false,
"queueDuration": 10,
"queueSize": 0,
"nonKeyFrameDropperThreshold": 4,
"decodedFrameDropperThreshold": 2
},
"logger": {
"logLevels": ["DEBUG","VERBOSE","INFO","WARN","ERROR","FATAL","PANIC"],
"logLevel": "DEBUG"
},
"kmp": {
"listenPort": 16543,
"listenAddress": "0.0.0.0",
"acceptTimeout": 15,
"idleTimeout": 10,
"connectTimeout": 10
},
"control": {
"listenPort": 18001,
"listenAddress": "0.0.0.0"
},
"debug": {
"diagnosticsIntervalInSeconds": 1
},
"output": {
"saveFile": true,
"outputFileNamePattern": "./output_%s.ts",
"streamingUrla": "kmp://localhost:6543",
"streamingUrl12": "kmp://192.168.11.59:6543",
"streamingUrl1": ""
},
"engine": {
"encoders": {
"h264": ["h264_nvenc","libx264","h264_videotoolbox"]
},
"presets": {
"A": {
"h264_videotoolbox": "default",
"libx264": "veryfast",
"h264_nvenc": "fast"
}
}
},
"errorPolicy": {
"exitOnError": false
},
"outputTracks": [
{
"trackId": "v32",
"enabled": false,
"passthrough": true
},
{
"trackId": "a32",
"enabled": true,
"passthrough": true
},
{
"trackId": "a33",
"enabled": true,
"bitrate": 64,
"passthrough": false,
"codec": "aac",
"audioParams": {
"samplingRate": -1,
"channels": 2
}
},
{
"trackId": "v33",
"passthrough": false,
"enabled": false,
"bitrate": 900,
"codec": "h264",
"videoParams": {
"profile": "main",
"preset": "A",
"height": 480
}
},
{
"trackId": "v34",
"enabled": false,
"passthrough": false,
"bitrate": 600,
"codec": "h264",
"videoParams": {
"profile": "baseline",
"preset": "A",
"height": 360
}
},
{
"trackId": "v35",
"enabled": false,
"passthrough": false,
"bitrate": 400,
"codec": "h264",
"videoParams": {
"profile": "baseline",
"preset": "A",
"height": 360
}
},
{
"trackId": "v42",
"enabled": false,
"passthrough": false,
"bitrate": 1500,
"codec": "h264",
"videoParams": {
"profile": "high",
"preset": "A",
"height": 720
}
},
{
"trackId": "v43",
"enabled": false,
"passthrough": false,
"bitrate": 2500,
"codec": "h264",
"videoParams": {
"profile": "high",
"preset": "A",
"height": 2160,
"skipFrame": 1
}
}
]
}
11 changes: 9 additions & 2 deletions transcoder/config_v.json → transcoder/tests/config_v.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
{
"input": {
"file": "/media/worng_order_video_only.mp4",
"realTime": true,
"realTime": false,
"activeStream": 0,
"xduration": 9000000,
"randomDataPercentage": 0,
"jumpOffsetSec": -60
"jumpOffsetSec": -60,
"hiccupIntervalSec": 0,
"hiccupDurationSec": 0
},
"throttler": {
"maxDataRate": 1.5,
"useStatsDataRate": false,
"minThrottleWaitMs": 1
},
"frameDropper1": {
"enabled": false,
Expand Down
1 change: 1 addition & 0 deletions transcoder/utils/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define CATEGORY_RECEIVER "RECEIVER"
#define CATEGORY_KMP "KMP"
#define CATEGORY_HTTP_SERVER "HTTPSERVER"
#define CATEGORY_THROTTLER "THROTTLER"

void logger1(const char* category,int level,const char *fmt, ...);
void loggerFlush();
Expand Down
9 changes: 7 additions & 2 deletions transcoder/utils/samples_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void sample_stats_init(samples_stats_t* pStats,AVRational basetime)
pStats->firstTimeStamp=0;
pStats->lastTimeStamp=0;
pStats->lastDts=0;
pStats->throttleWait=0;
}

void drain(samples_stats_t* pStats,uint64_t clock)
Expand Down Expand Up @@ -102,12 +103,15 @@ void sample_stats_get_diagnostics(samples_stats_t *pStats,json_writer_ctx_t js)
JSON_SERIALIZE_STRING("firstTimeStamp",pStats->firstTimeStamp>0 ? ts2str(pStats->firstTimeStamp,false): "N/A")
JSON_SERIALIZE_STRING("lastTimeStamp",pStats->lastTimeStamp>0 ? ts2str(pStats->lastTimeStamp,false) : "N/A")
JSON_SERIALIZE_INT64("lastDts",pStats->lastDts)
if(pStats->throttleWait > 0) {
JSON_SERIALIZE_INT64("throttle",pStats->throttleWait)
}
}


void samples_stats_log(const char* category,int level,samples_stats_t *stats,const char *prefix)
{
LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf",
LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf throttleWait %s",
prefix,
stats->totalFrames,
stats->totalErrors,
Expand All @@ -116,5 +120,6 @@ void samples_stats_log(const char* category,int level,samples_stats_t *stats,con
pts2str(stats->clockDrift),
((double)stats->currentBitRate)/(1000.0),
stats->currentFrameRate,
stats->currentRate)
stats->currentRate,
pts2str(stats->throttleWait))
}
1 change: 1 addition & 0 deletions transcoder/utils/samples_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ typedef struct
uint64_t firstTimeStamp,lastTimeStamp;
int64_t timeStampPassed;
int64_t clockDrift;
int64_t throttleWait;
} samples_stats_t;

void sample_stats_init(samples_stats_t* pStats,AVRational basetime);
Expand Down
Loading

0 comments on commit b574371

Please sign in to comment.