Skip to content

Commit

Permalink
[core] Fix the activation and stability check criteria (#1449)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethouris authored Aug 11, 2020
1 parent c88ff88 commit 931aae0
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 40 deletions.
128 changes: 92 additions & 36 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13685,11 +13685,9 @@ void CUDTGroup::sendBackup_CheckIdleTime(gli_t w_d)
}
}

void CUDTGroup::sendBackup_CheckRunningStability(gli_t w_d, const time_point currtime, size_t& w_nunstable)
bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point currtime)
{
steady_clock::time_point ts_oldest_unstable;

CUDT& u = w_d->ps->core();
CUDT& u = d->ps->core();
// This link might be unstable, check its responsiveness status
// NOTE: currtime - last_rsp_time: we believe this value will be always positive as
// the Tk clock is believed to be monotonic. The resulting value
Expand All @@ -13704,38 +13702,70 @@ void CUDTGroup::sendBackup_CheckRunningStability(gli_t w_d, const time_point cur

bool is_unstable = false;

HLOGC(dlog.Debug, log << "grp/sendBackup: CHECK STABLE: @" << d->id
<< ": TIMEDIFF {response= "
<< FormatDuration<DUNIT_MS>(currtime - u.m_tsLastRspTime)
<< " ACK="
<< FormatDuration<DUNIT_MS>(currtime - u.m_tsLastRspAckTime)
<< " activation="
<< (!is_zero(u.m_tsTmpActiveTime) ? FormatDuration<DUNIT_MS>(currtime - u.m_tsTmpActiveTime) : "PAST")
<< " unstable="
<< (!is_zero(u.m_tsUnstableSince) ? FormatDuration<DUNIT_MS>(currtime - u.m_tsUnstableSince) : "NEVER")
<< "}");

if (currtime > u.m_tsLastRspTime)
{
// The last response predates the start of this function, look at the difference
steady_clock::duration td_responsive = currtime - u.m_tsLastRspTime;

IF_HEAVY_LOGGING(string source = "heard");

bool check_stability = true;

if (!is_zero(u.m_tsTmpActiveTime) && u.m_tsTmpActiveTime < currtime)
{
// The link is temporary-activated. Calculate then since the activation time.
// Mind that if the difference against the last update time is SMALLER,
// the temporary activation time should be cleared.
steady_clock::duration td_active = currtime - u.m_tsTmpActiveTime;

// Use the activation time, if it happened later than the last response.
// Still, check it against the timeout.
if (td_active < td_responsive)
// Check the last received ACK time first. This time is initialized with 'now'
// at the CUDT::open call, so you can't count on the trap zero time here, but
// it's still possible to check if activation time predates the ACK time. Things
// are here in the following possible order:
//
// - ACK time (old because defined at open)
// - Response time (old because the time of received handshake or keepalive counts)
// ... long time nothing ...
// - Activation time.
//
// If we have this situation, we have to wait for at least one ACK that is
// newer than activation time. However, if in this situation we have a fresh
// response, that is:
//
// - ACK time
// ...
// - Activation time
// - Response time (because a Keepalive had a caprice to come accidentally after sending)
//
// We still wait for a situation that there's at least one ACK that is newer than activation.

// As we DO have activation time, we need to check if there's at least
// one ACK newer than activation, that is, td_acked < td_active
if (u.m_tsLastRspAckTime < u.m_tsTmpActiveTime)
{
IF_HEAVY_LOGGING(source = "activated");
td_responsive = td_active;
check_stability = false;
HLOGC(dlog.Debug, log << "grp/sendBackup: link @" << d->id << " activated after ACK, "
"not checking for stability");
}
else
{
u.m_tsTmpActiveTime = steady_clock::time_point();
}
}

if (count_microseconds(td_responsive) > m_uOPT_StabilityTimeout)
if (check_stability && count_microseconds(td_responsive) > m_uOPT_StabilityTimeout)
{
if (is_zero(u.m_tsUnstableSince))
{
HLOGC(dlog.Debug, log << "grp/sendBackup: socket NEW UNSTABLE: @" << w_d->id
HLOGC(dlog.Debug, log << "grp/sendBackup: socket NEW UNSTABLE: @" << d->id
<< " last " << source << " " << FormatDuration(td_responsive)
<< " > " << m_uOPT_StabilityTimeout << " (stability timeout)");
// The link seems to have missed two ACKs already.
Expand All @@ -13751,32 +13781,34 @@ void CUDTGroup::sendBackup_CheckRunningStability(gli_t w_d, const time_point cur
if (!is_unstable)
{
// If stability is ok, but unstable-since was set before, reset it.
HLOGC(dlog.Debug, log << "grp/sendBackup: link STABLE: @" << w_d->id
<< (is_zero(u.m_tsUnstableSince) ? " - RESTORED" : " - CONTINUED")
<< " TIME now - updated: " << FormatDuration<DUNIT_MS>(currtime - u.m_tsLastRspTime));
HLOGC(dlog.Debug, log << "grp/sendBackup: link STABLE: @" << d->id
<< (!is_zero(u.m_tsUnstableSince) ? " - RESTORED" : " - CONTINUED"));

u.m_tsUnstableSince = steady_clock::time_point();
is_unstable = false;
}

#if ENABLE_HEAVY_LOGGING
// Could be set above
if (u.m_tsUnstableSince != steady_clock::time_point())
if (is_unstable)
{
HLOGC(dlog.Debug, log << "grp/sendBackup: link UNSTABLE for "
<< FormatDuration(currtime - u.m_tsUnstableSince) << " : @" << w_d->id << " - will send a payload");
// The link is already unstable
if (ts_oldest_unstable != steady_clock::time_point() || ts_oldest_unstable > u.m_tsUnstableSince)
ts_oldest_unstable = u.m_tsUnstableSince;
++w_nunstable;
<< FormatDuration(currtime - u.m_tsUnstableSince) << " : @" << d->id
<< " - will send a payload");
}
else
{
HLOGC(dlog.Debug, log << "grp/sendBackup: socket in RUNNING state: @" << w_d->id << " - will send a payload");
HLOGC(dlog.Debug, log << "grp/sendBackup: socket in RUNNING state: @" << d->id << " - will send a payload");
}
#endif

return !is_unstable;
}

bool CUDTGroup::sendBackup_CheckSendStatus(gli_t d, const steady_clock::time_point& currtime ATR_UNUSED,
const int stat, const int erc, const int32_t lastseq, const int32_t pktseq,
CUDT& w_u, int32_t& w_curseq, vector<gli_t>& w_parallel, int& w_final_stat,
set<int>& w_sendable_pri, size_t& w_nsuccessful, size_t& w_nunstable)
set<int>& w_sendable_pri, size_t& w_nsuccessful, bool& w_is_nunstable)
{
bool none_succeeded = true;

Expand Down Expand Up @@ -13846,8 +13878,7 @@ bool CUDTGroup::sendBackup_CheckSendStatus(gli_t d, const steady_clock::time_poi
else if (erc == SRT_EASYNCSND)
{
HLOGC(dlog.Debug, log << "grp/sendBackup: Link @" << w_u.m_SocketID << " DEEMED UNSTABLE (not ready to send)");
if (is_zero(w_u.m_tsUnstableSince)) // skip those unstable already - they are already counted
++w_nunstable;
w_is_nunstable = true;
}

return none_succeeded;
Expand Down Expand Up @@ -14128,10 +14159,10 @@ void CUDTGroup::send_CloseBrokenSockets(vector<gli_t>& w_wipeme)
w_wipeme.clear();
}

void CUDTGroup::sendBackup_CheckParallelLinks(const size_t nunstable, vector<gli_t>& w_parallel,
void CUDTGroup::sendBackup_CheckParallelLinks(const vector<gli_t>& unstable, vector<gli_t>& w_parallel,
int& w_final_stat, bool& w_none_succeeded, SRT_MSGCTRL& w_mc, CUDTException& w_cx)
{
// In contradiction to redundancy sending, backup sending must check
// In contradiction to broadcast sending, backup sending must check
// the blocking state in total first. We need this information through
// epoll because we didn't use all sockets to send the data hence the
// blocked socket information would not be complete.
Expand All @@ -14143,7 +14174,20 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const size_t nunstable, vector<gli
// If sending succeeded also over at least one unstable link (you only have
// unstable links and none other or others just got broken), continue sending
// anyway.
if (w_parallel.empty() && !nunstable)

#if ENABLE_HEAVY_LOGGING
// Potential problem to be checked in developer mode
for (vector<gli_t>::iterator p = w_parallel.begin(); p != w_parallel.end(); ++p)
{
if (std::find(unstable.begin(), unstable.end(), *p) != unstable.end())
{
LOGC(dlog.Debug, log << "grp/sendBackup: IPE: parallel links enclose unstable link @"
<< (*p)->ps->m_SocketID);
}
}
#endif

if (w_parallel.empty() && !unstable.empty())
{
// XXX FILL THE TABLE
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
Expand Down Expand Up @@ -14375,10 +14419,10 @@ int CUDTGroup::sendBackup(const char *buf, int len, SRT_MSGCTRL& w_mc)
vector<gli_t> wipeme;
vector<gli_t> idlers;
vector<gli_t> pending;
vector<gli_t> unstable;

// We need them as sets because links at first seen as stable
// may become unstable after a while
size_t nunstable = 0;
vector<gli_t> sendable;

int stat = 0;
Expand Down Expand Up @@ -14421,7 +14465,11 @@ int CUDTGroup::sendBackup(const char *buf, int len, SRT_MSGCTRL& w_mc)

if (d->sndstate == SRT_GST_RUNNING)
{
sendBackup_CheckRunningStability(d, (currtime), (nunstable));
if (!sendBackup_CheckRunningStability(d, (currtime)))
{
insert_uniq((unstable), d);
}
// Unstable links should still be used for sending.
sendable.push_back(d);
continue;
}
Expand Down Expand Up @@ -14499,9 +14547,13 @@ int CUDTGroup::sendBackup(const char *buf, int len, SRT_MSGCTRL& w_mc)
erc = e.getErrorCode();
}

bool is_unstable = false;
none_succeeded &= sendBackup_CheckSendStatus(d, currtime, stat, erc, lastseq, w_mc.pktseq,
(u), (curseq), (parallel), (final_stat),
(sendable_pri), (nsuccessful), (nunstable));
(sendable_pri), (nsuccessful), (is_unstable));

if (is_unstable && is_zero(u.m_tsUnstableSince)) // Add to unstable only if it wasn't unstable already
insert_uniq((unstable), d);

const Sendstate cstate = {d, stat, erc};
sendstates.push_back(cstate);
Expand Down Expand Up @@ -14589,7 +14641,11 @@ int CUDTGroup::sendBackup(const char *buf, int len, SRT_MSGCTRL& w_mc)

// CHECK: no sendable that exceeds unstable
// This embraces the case when there are no sendable at all.
bool need_activate = sendable.size() <= nunstable;
// Note that unstable links still count as sendable; they
// are simply links that were qualified for sending, but:
// - have exceeded response timeout
// - have hit EASYNCSND error during sending
bool need_activate = sendable.size() <= unstable.size();
string activate_reason;
IF_HEAVY_LOGGING(activate_reason = "BY NO REASON???");
if (need_activate)
Expand Down Expand Up @@ -14647,14 +14703,14 @@ int CUDTGroup::sendBackup(const char *buf, int len, SRT_MSGCTRL& w_mc)
else
{
HLOGC(dlog.Debug, log << "grp/sendBackup: have sendable links, stable="
<< (sendable.size() - nunstable) << " unstable=" << nunstable);
<< (sendable.size() - unstable.size()) << " unstable=" << unstable.size());
}

send_CheckPendingSockets(pending, (wipeme));

send_CloseBrokenSockets((wipeme));

sendBackup_CheckParallelLinks(nunstable, (parallel), (final_stat), (none_succeeded), (w_mc), (cx));
sendBackup_CheckParallelLinks(unstable, (parallel), (final_stat), (none_succeeded), (w_mc), (cx));

if (none_succeeded)
{
Expand Down
6 changes: 3 additions & 3 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,10 @@ class CUDTGroup
// Support functions for sendBackup and sendBroadcast
bool send_CheckIdle(const gli_t d, std::vector<gli_t>& w_wipeme, std::vector<gli_t>& w_pending);
void sendBackup_CheckIdleTime(gli_t w_d);
void sendBackup_CheckRunningStability(const gli_t d, const time_point currtime, size_t& w_nunstable);
bool sendBackup_CheckRunningStability(const gli_t d, const time_point currtime);
bool sendBackup_CheckSendStatus(const gli_t d, const time_point& currtime, const int stat, const int erc, const int32_t lastseq,
const int32_t pktseq, CUDT& w_u, int32_t& w_curseq, std::vector<gli_t>& w_parallel,
int& w_final_stat, std::set<int>& w_sendable_pri, size_t& w_nsuccessful, size_t& w_nunstable);
int& w_final_stat, std::set<int>& w_sendable_pri, size_t& w_nsuccessful, bool& w_is_unstable);
void sendBackup_Buffering(const char* buf, const int len, int32_t& curseq, SRT_MSGCTRL& w_mc);
void sendBackup_CheckNeedActivate(const std::vector<gli_t>& idlers, const char *buf, const int len,
bool& w_none_succeeded, SRT_MSGCTRL& w_mc, int32_t& w_curseq, int32_t& w_final_stat,
Expand All @@ -481,7 +481,7 @@ class CUDTGroup
const std::string& activate_reason);
void send_CheckPendingSockets(const std::vector<gli_t>& pending, std::vector<gli_t>& w_wipeme);
void send_CloseBrokenSockets(std::vector<gli_t>& w_wipeme);
void sendBackup_CheckParallelLinks(const size_t nunstable, std::vector<gli_t>& w_parallel,
void sendBackup_CheckParallelLinks(const std::vector<gli_t>& unstable, std::vector<gli_t>& w_parallel,
int& w_final_stat, bool& w_none_succeeded, SRT_MSGCTRL& w_mc, CUDTException& w_cx);

public:
Expand Down
13 changes: 12 additions & 1 deletion srtcore/utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ written by
#include <algorithm>
#include <bitset>
#include <map>
#include <vector>
#include <functional>
#include <memory>
#include <iomanip>
Expand Down Expand Up @@ -676,7 +677,7 @@ std::string PrintableMod(const Container& in, const std::string& prefix)
}

template<typename InputIterator, typename OutputIterator, typename TransFunction>
void FilterIf(InputIterator bg, InputIterator nd,
inline void FilterIf(InputIterator bg, InputIterator nd,
OutputIterator out, TransFunction fn)
{
for (InputIterator i = bg; i != nd; ++i)
Expand All @@ -688,6 +689,16 @@ void FilterIf(InputIterator bg, InputIterator nd,
}
}

template <class Value, class ArgValue>
inline void insert_uniq(std::vector<Value>& v, const ArgValue& val)
{
typename std::vector<Value>::iterator i = std::find(v.begin(), v.end(), val);
if (i != v.end())
return;

v.push_back(val);
}

template <class Signature>
struct CallbackHolder
{
Expand Down

0 comments on commit 931aae0

Please sign in to comment.