Skip to content

Commit

Permalink
[core] Refactoring backup link activation
Browse files Browse the repository at this point in the history
maxsharabayko committed Jan 13, 2021
1 parent adaf323 commit 26adb8b
Showing 2 changed files with 120 additions and 89 deletions.
172 changes: 84 additions & 88 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
@@ -3046,7 +3046,7 @@ bool CUDTGroup::sendBackup_CheckSendStatus(gli_t
int& w_final_stat,
set<uint16_t>& w_sendable_pri,
size_t& w_nsuccessful,
bool& w_is_nunstable)
bool& w_is_unstable)
{
bool none_succeeded = true;

@@ -3118,7 +3118,7 @@ bool CUDTGroup::sendBackup_CheckSendStatus(gli_t
else if (erc == SRT_EASYNCSND)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: Link @" << w_u.m_SocketID << " DEEMED UNSTABLE (not ready to send)");
w_is_nunstable = true;
w_is_unstable = true;
}

return none_succeeded;
@@ -3175,19 +3175,72 @@ void CUDTGroup::sendBackup_Buffering(const char* buf, const int len, int32_t& w_
m_iLastSchedSeqNo = oldest_buffer_seq;
}

bool CUDTGroup::sendBackup_IsActivationNeeded(const vector<gli_t>& idlers,
const vector<gli_t>& unstable,
const vector<gli_t>& sendable,
const set<uint16_t> sendable_pri,
string& activate_reason ATR_UNUSED) const
{
SRT_ASSERT(sendable.size() >= unstable.size());
bool need_activate = sendable.size() <= unstable.size(); // <= for sanity, should be just =
IF_HEAVY_LOGGING(activate_reason = "BY NO REASON???");
if (need_activate)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: all " << sendable.size() << " links unstable - will activate an idle link");
IF_HEAVY_LOGGING(activate_reason = "no stable links");
}
else if (sendable_pri.empty() ||
(!idlers.empty() && FPriorityOrder::check(idlers[0]->weight, *sendable_pri.begin())))
{
need_activate = true;
#if ENABLE_HEAVY_LOGGING
if (sendable_pri.empty())
{
activate_reason = "no successful links found";
LOGC(gslog.Debug, log << "grp/sendBackup: no active links were successful - will activate an idle link");
}
else if (idlers.empty())
{
// This should be impossible.
activate_reason = "WEIRD (no idle links!)";
LOGC(gslog.Debug,
log << "grp/sendBackup: BY WEIRD AND IMPOSSIBLE REASON (IPE?) - will activate an idle link");
}
else
{
// Only now we are granted that both sendable_pri and idlers are nonempty
LOGC(gslog.Debug,
log << "grp/sendBackup: found link pri " << idlers[0]->weight << " PREF OVER "
<< (*sendable_pri.begin()) << " (highest from sendable) - will activate an idle link");
activate_reason = "found higher pri link";
}
#endif
}
else
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: sendable_pri (" << sendable_pri.size() << "): " << Printable(sendable_pri)
<< " first idle pri: " << (idlers.size() > 0 ? int(idlers[0]->weight) : -1)
<< " - will NOT activate an idle link");
}

return need_activate;
}

// [[using locked(this->m_GroupLock)]]
size_t CUDTGroup::sendBackup_CheckNeedActivate(const vector<gli_t>& idlers,
const char* buf,
const int len,
bool& w_none_succeeded,
SRT_MSGCTRL& w_mc,
int32_t& w_curseq,
int32_t& w_final_stat,
CUDTException& w_cx,
vector<Sendstate>& w_sendstates,
vector<gli_t>& w_parallel,
vector<SRTSOCKET>& w_wipeme,
const string& activate_reason ATR_UNUSED)
size_t CUDTGroup::sendBackup_TryActivateIdleLink(const vector<gli_t>& idlers,
const char* buf,
const int len,
bool& w_none_succeeded,
SRT_MSGCTRL& w_mc,
int32_t& w_curseq,
int32_t& w_final_stat,
CUDTException& w_cx,
vector<Sendstate>& w_sendstates,
vector<gli_t>& w_parallel,
vector<SRTSOCKET>& w_wipeme,
const string& activate_reason ATR_UNUSED)
{
int stat = -1;

@@ -3840,6 +3893,21 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
// Sort the idle sockets by priority so the highest priority idle links are checked first.
sort(idlers.begin(), idlers.end(), FPriorityOrder());

#if ENABLE_HEAVY_LOGGING
{
vector<SRTSOCKET> show_running, show_idle;
for (vector<gli_t>::iterator i = sendable.begin(); i != sendable.end(); ++i)
show_running.push_back((*i)->id);

for (vector<gli_t>::iterator i = idlers.begin(); i != idlers.end(); ++i)
show_idle.push_back((*i)->id);

LOGC(gslog.Debug,
log << "grp/sendBackup: RUNNING: " << PrintableMod(show_running, "@")
<< " IDLE: " << PrintableMod(show_idle, "@"));
}
#endif

vector<Sendstate> sendstates;

// Ok, we've separated the unstable from sendable just to know if:
@@ -3856,21 +3924,6 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
// priority should remain active.
vector<gli_t> parallel;

#if ENABLE_HEAVY_LOGGING
{
vector<SRTSOCKET> show_running, show_idle;
for (vector<gli_t>::iterator i = sendable.begin(); i != sendable.end(); ++i)
show_running.push_back((*i)->id);

for (vector<gli_t>::iterator i = idlers.begin(); i != idlers.end(); ++i)
show_idle.push_back((*i)->id);

LOGC(gslog.Debug,
log << "grp/sendBackup: RUNNING: " << PrintableMod(show_running, "@")
<< " IDLE: " << PrintableMod(show_idle, "@"));
}
#endif

int32_t curseq = SRT_SEQNO_NONE;
size_t nsuccessful = 0;

@@ -4003,73 +4056,16 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)

sendBackup_Buffering(buf, len, (curseq), (w_mc));

// CHECK: no sendable that exceeds unstable
// This embraces the case when there are no sendable at all.
// Note that unstable links still count as sendable; they
// are simply links that were qualified for sending, but:
// - have exceeded response timeout
// - have hit EASYNCSND error during sending
bool need_activate = sendable.size() <= unstable.size();
string activate_reason;
IF_HEAVY_LOGGING(activate_reason = "BY NO REASON???");
if (need_activate)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: all " << sendable.size() << " links unstable - will activate an idle link");
IF_HEAVY_LOGGING(activate_reason = "no stable links");
}
else
{
// Another reason to activate might be if the link with highest priority
// among the idlers has a higher priority than any link currently active
// (those are collected in 'sendable_pri'). Check if there are any (if
// no sendable, a new link needs to be activated anyway), and if the
// priority has a lower number.
if (sendable_pri.empty() || (!idlers.empty() && FPriorityOrder::check(idlers[0]->weight, *sendable_pri.begin())))
{
need_activate = true;
#if ENABLE_HEAVY_LOGGING
if (sendable_pri.empty())
{
activate_reason = "no successful links found";
LOGC(gslog.Debug,
log << "grp/sendBackup: no active links were successful - will activate an idle link");
}
else if (idlers.empty())
{
// This should be impossible.
activate_reason = "WEIRD (no idle links!)";
LOGC(gslog.Debug,
log << "grp/sendBackup: BY WEIRD AND IMPOSSIBLE REASON (IPE?) - will activate an idle link");
}
else
{
// Only now we are granted that both sendable_pri and idlers are nonempty
LOGC(gslog.Debug,
log << "grp/sendBackup: found link pri " << idlers[0]->weight << " PREF OVER " << (*sendable_pri.begin())
<< " (highest from sendable) - will activate an idle link");
activate_reason = "found higher pri link";
}
#endif
}
else
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: sendable_pri (" << sendable_pri.size() << "): " << Printable(sendable_pri)
<< " first idle pri: " << (idlers.size() > 0 ? int(idlers[0]->weight) : -1)
<< " - will NOT activate an idle link");
}
}

if (need_activate)
if (sendBackup_IsActivationNeeded(idlers, unstable, sendable, sendable_pri, activate_reason))
{
if (idlers.empty())
{
HLOGP(gslog.Debug, "grp/sendBackup: no idlers to activate, keeping only unstable links");
}
else
{
size_t n ATR_UNUSED = sendBackup_CheckNeedActivate(idlers,
const size_t n ATR_UNUSED = sendBackup_TryActivateIdleLink(idlers,
buf,
len,
(none_succeeded),
37 changes: 36 additions & 1 deletion srtcore/group.h
Original file line number Diff line number Diff line change
@@ -239,6 +239,21 @@ class CUDTGroup
/// @retval false running link is unstable
bool sendBackup_CheckRunningStability(const gli_t d, const time_point currtime);

/// Check link sending status
/// @param[in] d Group member iterator
/// @param[in] currtime Current time (logging only)
/// @param[in] stat Result of sending over the socket
/// @param[in] lastseq Last sent sequence number before the current sending operation
/// @param[in] pktseq Packet sequence number currently tried to be sent
/// @param[out] w_u CUDT unit of the current member (to allow calling overrideSndSeqNo)
/// @param[out] w_curseq Group's current sequence number (either -1 or the value used already for other links)
/// @param[out] w_parallel Parallel link container (will be filled inside this function)
/// @param[out] w_final_stat Status to be reported by this function eventually
/// @param[out] w_sendable_pri Weight value from every link (will auto-sort)
/// @param[out] w_nsuccessful Updates the number of successful links
/// @param[out] w_is_unstable Set true if sending resulted in AGAIN error.
///
/// @returns true if the sending operation result (submitted in stat) is a success, false otherwise.
bool sendBackup_CheckSendStatus(const gli_t d,
const time_point& currtime,
const int stat,
@@ -253,7 +268,27 @@ class CUDTGroup
size_t& w_nsuccessful,
bool& w_is_unstable);
void sendBackup_Buffering(const char* buf, const int len, int32_t& curseq, SRT_MSGCTRL& w_mc);
size_t sendBackup_CheckNeedActivate(const std::vector<gli_t>& idlers,

/// Check activation conditions and activate a backup link if needed.
/// Backup link activation is needed if:
///
/// 1. All currently active links are unstable.
/// Note that unstable links still count as sendable; they
/// are simply links that were qualified for sending, but:
/// - have exceeded response timeout
/// - have hit EASYNCSND error during sending
///
/// 2. Another reason to activate might be if one of idle links
/// has a higher weight than any link currently active
/// (those are collected in 'sendable_pri').
/// If there are no sendable, a new link needs to be activated anyway.
bool sendBackup_IsActivationNeeded(const std::vector<CUDTGroup::gli_t>& idlers,
const std::vector<gli_t>& unstable,
const std::vector<gli_t>& sendable,
const std::set<uint16_t> sendable_pri,
std::string& activate_reason) const;

size_t sendBackup_TryActivateIdleLink(const std::vector<gli_t>& idlers,
const char* buf,
const int len,
bool& w_none_succeeded,

0 comments on commit 26adb8b

Please sign in to comment.