Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime link stability timeout for main/backup #1775

Merged
merged 7 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ endforeach()
# SRT_DEBUG_TSBPD_WRAP 1 /* Debug packet timestamp wraparound */
# SRT_DEBUG_TLPKTDROP_DROPSEQ 1
# SRT_DEBUG_SNDQ_HIGHRATE 1
# SRT_DEBUG_BONDING_STATES 1
# SRT_MAVG_SAMPLING_RATE 40 /* Max sampling rate */

# option defaults
Expand Down
6 changes: 5 additions & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,12 @@ class CUDT

bool isOPT_TsbPd() const { return m_bOPT_TsbPd; }
int RTT() const { return m_iRTT; }
int RTTVar() const { return m_iRTTVar; }
int32_t sndSeqNo() const { return m_iSndCurrSeqNo; }
int32_t schedSeqNo() const { return m_iSndNextSeqNo; }
bool overrideSndSeqNo(int32_t seq);
srt::sync::steady_clock::time_point LastRspTime() const { return m_tsLastRspTime; }
srt::sync::steady_clock::time_point FreshActivationStart() const { return m_tsFreshActivation; }
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved

int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; }
int flowWindowSize() const { return m_iFlowWindowSize; }
Expand All @@ -385,7 +388,8 @@ class CUDT
int64_t maxBandwidth() const { return m_llMaxBW; }
int MSS() const { return m_iMSS; }

uint32_t latency_us() const {return m_iTsbPdDelay_ms*1000; }
uint32_t peer_latency_us() const {return m_iPeerTsbPdDelay_ms * 1000; }
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
int peer_idle_tout_ms() const { return m_iOPT_PeerIdleTimeout; }
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
size_t maxPayloadSize() const { return m_iMaxSRTPayloadSize; }
size_t OPT_PayloadSize() const { return m_zOPT_ExpPayloadSize; }
int sndLossLength() { return m_pSndLossList->getLossLength(); }
Expand Down
192 changes: 119 additions & 73 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2969,6 +2969,112 @@ void CUDTGroup::sendBackup_CheckIdleTime(gli_t w_d)
}
}

#if SRT_DEBUG_BONDING_STATES
class StabilityTracer
{
public:
StabilityTracer()
{
}

~StabilityTracer()
{
srt::sync::ScopedLock lck(m_mtx);
m_fout.close();
}

void trace(const CUDT& u, const srt::sync::steady_clock::time_point& currtime, uint32_t activation_period_us,
int64_t stability_tmo_us, const std::string& state, uint16_t weight)
{
srt::sync::ScopedLock lck(m_mtx);
create_file();

m_fout << srt::sync::FormatTime(currtime) << ",";
m_fout << u.id() << ",";
m_fout << weight << ",";
m_fout << u.peer_latency_us() << ",";
m_fout << u.RTT() << ",";
m_fout << u.RTTVar() << ",";
m_fout << stability_tmo_us << ",";
m_fout << count_microseconds(currtime - u.LastRspTime()) << ",";
m_fout << state << ",";
m_fout << (srt::sync::is_zero(u.FreshActivationStart()) ? -1 : (count_microseconds(currtime - u.FreshActivationStart()))) << ",";
m_fout << activation_period_us << "\n";
m_fout.flush();
}

private:
void print_header()
{
//srt::sync::ScopedLock lck(m_mtx);
m_fout << "Timepoint,SocketID,weight,usLatency,usRTT,usRTTVar,usStabilityTimeout,usSinceLastResp,State,usSinceActivation,usActivationPeriod\n";
}

void create_file()
{
if (m_fout)
return;

std::string str_tnow = srt::sync::FormatTimeSys(srt::sync::steady_clock::now());
str_tnow.resize(str_tnow.size() - 6); // remove trailing ' [SYS]' part
while (str_tnow.find(':') != std::string::npos) {
str_tnow.replace(str_tnow.find(':'), 1, 1, '_');
}
const std::string fname = "stability_trace_" + str_tnow + ".csv";
m_fout.open(fname, std::ofstream::out);
if (!m_fout)
std::cerr << "IPE: Failed to open " << fname << "!!!\n";

print_header();
}

private:
srt::sync::Mutex m_mtx;
std::ofstream m_fout;
};

StabilityTracer s_stab_trace;
#endif

/// TODO: Remove 'weight' parameter? Only needed for logging.
/// @retval 1 - link is identified as stable
/// @retval 0 - link state remains unchanged (too early to identify, still in activation phase)
/// @retval -1 - link is identified as unstable
static int sendBackup_CheckRunningLinkStable(const CUDT& u, const srt::sync::steady_clock::time_point& currtime, uint16_t weight)
{
const uint32_t latency_us = u.peer_latency_us();
const int32_t min_stability_us = 60000; // Minimum Link Stability Timeout: 60ms.
const int64_t initial_stabtout_us = max<int64_t>(min_stability_us, latency_us);
const int64_t activation_period_us = initial_stabtout_us + 5 * CUDT::COMM_SYN_INTERVAL_US;

// RTT and RTTVar values are still being refined during activation period,
// therefore the dymanic timeout should not be used in activation phase.
const bool is_activation_phase = !is_zero(u.FreshActivationStart())
&& (count_microseconds(currtime - u.FreshActivationStart()) <= activation_period_us);

const int peer_idle_tout_us = u.peer_idle_tout_ms() * 1000;

const int64_t stability_tout_us = is_activation_phase
? initial_stabtout_us // activation phase
: min<int64_t>(max<int64_t>(min_stability_us, 2 * u.RTT() + 4 * u.RTTVar()), latency_us);

const steady_clock::duration td_response = currtime - u.LastRspTime();
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
if (count_microseconds(td_response) > stability_tout_us)
{
#if SRT_DEBUG_BONDING_STATES
s_stab_trace.trace(u, currtime, activation_period_us, stability_tout_us, is_activation_phase ? "ACTIVATION-UNSTABLE" : "UNSTABLE", weight);
#endif
return -1;
}

// u.LastRspTime() > currtime is alwats true due to the very first check above in this function
#if SRT_DEBUG_BONDING_STATES
s_stab_trace.trace(u, currtime, activation_period_us, stability_tout_us, is_activation_phase ? "ACTIVATION" : "STABLE", weight);
#endif
return is_activation_phase ? 0 : 1;
}


// [[using locked(this->m_GroupLock)]]
bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point currtime)
{
Expand All @@ -2985,8 +3091,6 @@ bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point
// negative value is relatively easy, while introducing a mutex would only add a
// deadlock risk and performance degradation.

bool is_stable = true;

HLOGC(gslog.Debug,
log << "grp/sendBackup: CHECK STABLE: @" << d->id
<< ": TIMEDIFF {response= " << FormatDuration<DUNIT_MS>(currtime - u.m_tsLastRspTime)
Expand All @@ -2996,89 +3100,33 @@ bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point
<< (!is_zero(u.m_tsUnstableSince) ? FormatDuration<DUNIT_MS>(currtime - u.m_tsUnstableSince) : "NEVER")
<< "}");

if (currtime > u.m_tsLastRspTime)
{
// The last response predates the start of this function, look at the difference
const steady_clock::duration td_responsive = currtime - u.m_tsLastRspTime;
bool check_stability = true;

if (!is_zero(u.m_tsFreshActivation) && u.m_tsFreshActivation < currtime)
{
// The link is temporary-activated. Calculate then since the activation time.

// Check the last received ACK time first. This time is initialized with 'now'
// at the CUDT::open call, so you can't count on the trap zero time here, but
// it's still possible to check if activation time predates the ACK time. Things
// are here in the following possible order:
//
// - ACK time (old because defined at open)
// - Response time (old because the time of received handshake or keepalive counts)
// ... long time nothing ...
// - Activation time.
//
// If we have this situation, we have to wait for at least one ACK that is
// newer than activation time. However, if in this situation we have a fresh
// response, that is:
//
// - ACK time
// ...
// - Activation time
// - Response time (because a Keepalive had a caprice to come accidentally after sending)
//
// We still wait for a situation that there's at least one ACK that is newer than activation.

// As we DO have activation time, we need to check if there's at least
// one ACK newer than activation, that is, td_acked < td_active
if (u.m_tsLastRspAckTime < u.m_tsFreshActivation)
{
check_stability = false;
HLOGC(gslog.Debug,
log << "grp/sendBackup: link @" << d->id
<< " activated after ACK, "
"not checking for stability");
}
else
{
u.m_tsFreshActivation = steady_clock::time_point();
}
}

if (check_stability && count_microseconds(td_responsive) > m_uOPT_StabilityTimeout)
{
if (is_zero(u.m_tsUnstableSince))
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: socket NEW UNSTABLE: @" << d->id << " last heard "
<< FormatDuration(td_responsive) << " > " << m_uOPT_StabilityTimeout
<< " (stability timeout)");
// The link seems to have missed two ACKs already.
// Qualify this link as unstable
// Notify that it has been seen so since now
u.m_tsUnstableSince = currtime;
}

is_stable = false;
}
}
const int is_stable = sendBackup_CheckRunningLinkStable(u, currtime, d->weight);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you protect yourself against rapid changes in this value in case when, for example, RTT starts to increase dramatically?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RTT changes are smoothed by the IIR filter (first on the receiver side, then again on the sender side). So there won't be that high fluctuation. And if some fluctuations fall outside the stability threshold, that is an indicator that something unstable is happening on the link.


if (is_stable)
if (is_stable >= 0)
{
// If stability is ok, but unstable-since was set before, reset it.
HLOGC(gslog.Debug,
log << "grp/sendBackup: link STABLE: @" << d->id
<< (!is_zero(u.m_tsUnstableSince) ? " - RESTORED" : " - CONTINUED")
<< ", state RUNNING - will send a payload");

u.m_tsUnstableSince = steady_clock::time_point();

// For some cases
if (is_stable > 0)
u.m_tsFreshActivation = steady_clock::time_point();
}
else
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: link UNSTABLE for " << FormatDuration(currtime - u.m_tsUnstableSince) << " : @"
<< d->id << " - will send a payload");
if (is_zero(u.m_tsUnstableSince))
{
u.m_tsUnstableSince = currtime;
}
}

return is_stable;
return is_stable >= 0;
}

// [[using locked(this->m_GroupLock)]]
Expand Down Expand Up @@ -3801,12 +3849,10 @@ void CUDTGroup::sendBackup_SilenceRedundantLinks(vector<gli_t>& w_parallel)
}
CUDT& ce = d->ps->core();
steady_clock::duration td(0);
if (!is_zero(ce.m_tsFreshActivation) &&
count_microseconds(td = currtime - ce.m_tsFreshActivation) < ce.m_uOPT_StabilityTimeout)
if (!is_zero(ce.m_tsFreshActivation) && sendBackup_CheckRunningLinkStable(ce, currtime, d->weight) != 1)
{
HLOGC(gslog.Debug,
log << "... not silencing @" << d->id << ": too early: " << FormatDuration(td) << " < "
<< ce.m_uOPT_StabilityTimeout << "(stability timeout)");
log << "... not silencing @" << d->id << ": too early: " << FormatDuration(td));
continue;
}

Expand Down