Skip to content

Commit

Permalink
[core] Added CRendezvousQueue::qualifyToHandle.
Browse files Browse the repository at this point in the history
Now TTL of a socket pending for connection is checked on every update iteration -> more precise connection timeout.
  • Loading branch information
maxsharabayko committed May 12, 2021
1 parent 36f8995 commit 917a715
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 167 deletions.
276 changes: 111 additions & 165 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -885,176 +885,30 @@ CUDT* CRendezvousQueue::retrieve(const sockaddr_any& addr, SRTSOCKET& w_id) cons
return NULL;
}

struct LinkStatusInfo
void CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, const CPacket &pktIn)
{
CUDT* u;
SRTSOCKET id;
int errorcode;
sockaddr_any peeraddr;
int token;
vector<LinkStatusInfo> toRemove, toProcess;

struct HasID
{
SRTSOCKET id;
HasID(SRTSOCKET p): id(p) {}
bool operator()(const LinkStatusInfo& i)
{
return i.id == id;
}
};
};

void CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, const CPacket &response)
{
vector<LinkStatusInfo> ufailed, uprocess;

#if ENABLE_HEAVY_LOGGING
int debug_nupd = 0;
int debug_nrun = 0;
int debug_nfail = 0;
#endif

{
ScopedLock vg(m_RIDListLock);

if (m_lRendezvousID.empty())
return;

HLOGC(cnlog.Debug,
log << "updateConnStatus: updating after getting pkt id=" << response.m_iID
<< " status: " << ConnectStatusStr(cst));

for (list<CRL>::iterator i = m_lRendezvousID.begin(), i_next = i; i != m_lRendezvousID.end(); i = i_next)
{
++i_next;
// NOTE: This is a SAFE LOOP.
// Incrementation will be done at the end, after the processing did not
// REMOVE the currently processed element. When the element was removed,
// the iterator value for the next iteration will be taken from erase()'s result.

// RST_AGAIN happens in case when the last attempt to read a packet from the UDP
// socket has read nothing. In this case it would be a repeated update, while
// still waiting for a response from the peer. When we have any other state here
// (most expectably CONN_CONTINUE or CONN_RENDEZVOUS, which means that a packet has
// just arrived in this iteration), do the update immetiately (in SRT this also
// involves additional incoming data interpretation, which wasn't the case in UDT).

// Use "slow" cyclic responding in case when
// - RST_AGAIN (no packet was received for whichever socket)
// - a packet was received, but not for THIS socket
if (rst == RST_AGAIN || i->m_iID != response.m_iID)
{
// If no packet has been received from the peer,
// avoid sending too many requests, at most 1 request per 250ms
const steady_clock::time_point then = i->m_pUDT->m_tsLastReqTime;
const steady_clock::time_point now = steady_clock::now();
const steady_clock::duration timeout_250ms = milliseconds_from(250);
const bool now_is_time = (now - then) > timeout_250ms;
HLOGC(cnlog.Debug,
log << "RID:@" << i->m_iID << " then=" << FormatTime(then)
<< " now=" << FormatTime(now) << " passed=" << count_microseconds(now - then)
<< "<=> 250000 -- now's " << (now_is_time ? "" : "NOT ") << "the time");

if (!now_is_time)
continue;
}

HLOGC(cnlog.Debug, log << "RID:@" << i->m_iID << " cst=" << ConnectStatusStr(cst) << " -- sending update NOW.");

#if ENABLE_HEAVY_LOGGING
++debug_nrun;
#endif

// XXX This looks like a loop that rolls in infinity without any sleeps
// inside and makes it once per about 50 calls send a hs conclusion
// for a randomly sampled rendezvous ID of a socket out of the list.
// Ok, probably the rendezvous ID should be just one so not much to
// sample from, but if so, why the container?
//
// This must be somehow fixed!
//
// Maybe the time should be simply checked once and the whole loop not
// done when "it's not the time"?
const steady_clock::time_point now = steady_clock::now();
if (now >= i->m_tsTTL)
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID
<< " removed - EXPIRED ("
// The "enforced on FAILURE" is below when processAsyncConnectRequest failed.
<< (is_zero(i->m_tsTTL) ? "enforced on FAILURE" : "passed TTL")
<< "). WILL REMOVE from queue");

// Set appropriate error information, but do not update yet.
// Exit the lock first. Collect objects to update them later.
int ccerror = SRT_ECONNREJ;
if (i->m_pUDT->m_RejectReason == SRT_REJ_UNKNOWN)
{
if (!is_zero(i->m_tsTTL))
{
// Timer expired, set TIMEOUT forcefully
i->m_pUDT->m_RejectReason = SRT_REJ_TIMEOUT;
ccerror = SRT_ENOSERVER;
}
else
{
// In case of unknown reason, rejection should at least
// suggest error on the peer
i->m_pUDT->m_RejectReason = SRT_REJ_PEER;
}
}

// The call to completeBrokenConnectionDependencies() cannot happen here
// under the lock of m_RIDListLock as it risks a deadlock. Collect it
// to update later.
LinkStatusInfo fi = {i->m_pUDT, i->m_iID, ccerror, i->m_PeerAddr, -1};
ufailed.push_back(fi);

// i_next was preincremented, but this is guaranteed to point to
// the element next to erased one.
i_next = m_lRendezvousID.erase(i);
continue;
}
else
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " still active (remaining "
<< std::fixed << (count_microseconds(i->m_tsTTL - now)/1000000.0) << "s of TTL)...");
}

// This queue is used only in case of Async mode (rendezvous or caller-listener).
// Synchronous connection requests are handled in startConnect() completely.
if (!i->m_pUDT->m_config.bSynRecving)
{
IF_HEAVY_LOGGING(++debug_nupd);

// Collect them so that they can be updated out of m_RIDListLock.
LinkStatusInfo fi = { i->m_pUDT, i->m_iID, SRT_SUCCESS, i->m_PeerAddr, -1};
uprocess.push_back(fi);
// NOTE: safe loop, the incrementation was done before the loop body,
// so the `i' node can be safely deleted. Just the body must end here.
continue;
}
else
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " deemed SYNCHRONOUS, NOT UPDATING");
}
}

}
// If no socket were qualified for further handling, finish here.
// Otherwise toRemove and toProcess contain items to handle.
if (!qualifyToHandle(rst, cst, pktIn.m_iID, toRemove, toProcess))
return;

// [[using locked()]];

HLOGC(cnlog.Debug, log << "updateConnStatus: collected " << uprocess.size() << " for processing, "
<< ufailed.size() << " to close");
HLOGC(cnlog.Debug, log << "updateConnStatus: collected " << toProcess.size() << " for processing, "
<< toRemove.size() << " to close");

for (vector<LinkStatusInfo>::iterator i = uprocess.begin(); i != uprocess.end(); ++i)
// Repeat (resend) connection request.
for (vector<LinkStatusInfo>::iterator i = toProcess.begin(); i != toProcess.end(); ++i)
{
// IMPORTANT INFORMATION concerning changes towards UDT legacy.
// In the UDT code there was no attempt to interpret any incoming data.
// All data from the incoming packet were considered to be already deployed into
// m_ConnRes field, and m_ConnReq field was considered at this time accordingly updated.
// Therefore this procedure did only one thing: craft a new handshake packet and send it.
// In SRT this may also interpret extra data (extensions in case when Agent is Responder)
// and the `response` packet may sometimes contain no data. Therefore the passed `rst`
// and the `pktIn` packet may sometimes contain no data. Therefore the passed `rst`
// must be checked to distinguish the call by periodic update (RST_AGAIN) from a call
// due to have received the packet (RST_OK).
//
Expand All @@ -1065,23 +919,22 @@ void CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, con
EReadStatus read_st = rst;
EConnectStatus conn_st = cst;

if (i->id != response.m_iID)
if (i->id != pktIn.m_iID)
{
read_st = RST_AGAIN;
conn_st = CONN_AGAIN;
}

HLOGC(cnlog.Debug, log << "updateConnStatus: processing async conn for @" << i->id << " FROM " << i->peeraddr.str());

if (!i->u->processAsyncConnectRequest(read_st, conn_st, response, i->peeraddr))
if (!i->u->processAsyncConnectRequest(read_st, conn_st, pktIn, i->peeraddr))
{
// cst == CONN_REJECT can only be result of worker_ProcessAddressedPacket and
// its already set in this case.
LinkStatusInfo fi = *i;
fi.errorcode = SRT_ECONNREJ;
ufailed.push_back(fi);
toRemove.push_back(fi);
i->u->sendCtrl(UMSG_SHUTDOWN);
IF_HEAVY_LOGGING(++debug_nfail);
}

}
Expand All @@ -1092,7 +945,7 @@ void CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, con
// they are moved to ClosedSockets and it is believed that this function will
// not be held on mutexes that long.

for (vector<LinkStatusInfo>::iterator i = ufailed.begin(); i != ufailed.end(); ++i)
for (vector<LinkStatusInfo>::iterator i = toRemove.begin(); i != toRemove.end(); ++i)
{
HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
/*
Expand Down Expand Up @@ -1120,17 +973,110 @@ void CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, con
ScopedLock vg(m_RIDListLock);
for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
{
if (find_if(ufailed.begin(), ufailed.end(), LinkStatusInfo::HasID(i->m_iID)) != ufailed.end())
if (find_if(toRemove.begin(), toRemove.end(), LinkStatusInfo::HasID(i->m_iID)) != toRemove.end())
{
LOGC(cnlog.Error, log << "updateConnStatus: processAsyncConnectRequest FAILED on @" << i->m_iID << ". Setting TTL as EXPIRED.");
i->m_tsTTL = steady_clock::time_point(); // Make it expire right now, will be picked up at the next iteration
}
}
}
}

bool CRendezvousQueue::qualifyToHandle(EReadStatus rst, EConnectStatus cst SRT_ATR_UNUSED,
int iDstSockID, vector<LinkStatusInfo>& toRemove, vector<LinkStatusInfo>& toProcess)
{
ScopedLock vg(m_RIDListLock);

if (m_lRendezvousID.empty())
return false; // nothing to process.

HLOGC(cnlog.Debug,
log << "updateConnStatus: " << debug_nupd << "/" << debug_nrun << " sockets updated ("
<< (debug_nrun - debug_nupd) << " useless). REMOVED " << debug_nfail << " sockets.");
log << "updateConnStatus: updating after getting pkt with DST socket ID @" << iDstSockID
<< " status: " << ConnectStatusStr(cst));

for (list<CRL>::iterator i = m_lRendezvousID.begin(), i_next = i; i != m_lRendezvousID.end(); i = i_next)
{
// Safe iterator to the next element. If the current element is erased, the iterator is updated again.
++i_next;

const steady_clock::time_point tsNow = steady_clock::now();

if (tsNow >= i->m_tsTTL)
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID
<< " removed - EXPIRED ("
// The "enforced on FAILURE" is below when processAsyncConnectRequest failed.
<< (is_zero(i->m_tsTTL) ? "enforced on FAILURE" : "passed TTL")
<< "). WILL REMOVE from queue.");

// Set appropriate error information, but do not update yet.
// Exit the lock first. Collect objects to update them later.
int ccerror = SRT_ECONNREJ;
if (i->m_pUDT->m_RejectReason == SRT_REJ_UNKNOWN)
{
if (!is_zero(i->m_tsTTL))
{
// Timer expired, set TIMEOUT forcefully
i->m_pUDT->m_RejectReason = SRT_REJ_TIMEOUT;
ccerror = SRT_ENOSERVER;
}
else
{
// In case of unknown reason, rejection should at least
// suggest error on the peer
i->m_pUDT->m_RejectReason = SRT_REJ_PEER;
}
}

// The call to completeBrokenConnectionDependencies() cannot happen here
// under the lock of m_RIDListLock as it risks a deadlock.
// Collect in 'toRemove' to update later.
LinkStatusInfo fi = { i->m_pUDT, i->m_iID, ccerror, i->m_PeerAddr, -1 };
toRemove.push_back(fi);

// i_next was preincremented, but this is guaranteed to point to
// the element next to erased one.
i_next = m_lRendezvousID.erase(i);
continue;
}
else
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " still active (remaining "
<< std::fixed << (count_microseconds(i->m_tsTTL - tsNow) / 1000000.0) << "s of TTL)...");
}

const steady_clock::time_point tsLastReq = i->m_pUDT->m_tsLastReqTime;
const steady_clock::time_point tsRepeat = tsLastReq + milliseconds_from(250); // Repeat connection request (send HS).

// A connection request is repeated every 250 ms if there was no response from the peer:
// - RST_AGAIN means no packet was received over UDP.
// - a packet was received, but not for THIS socket.
if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat)
{
HLOGC(cnlog.Debug,
log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0
<< " ms passed since last connection request.");

continue;
}

HLOGC(cnlog.Debug, log << "RID:@" << i->m_iID << " cst=" << ConnectStatusStr(cst) << " -- repeating connection request.");

// This queue is used only in case of Async mode (rendezvous or caller-listener).
// Synchronous connection requests are handled in startConnect() completely.
if (!i->m_pUDT->m_config.bSynRecving)
{
// Collect them so that they can be updated out of m_RIDListLock.
LinkStatusInfo fi = { i->m_pUDT, i->m_iID, SRT_SUCCESS, i->m_PeerAddr, -1 };
toProcess.push_back(fi);
}
else
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " is SYNCHRONOUS, NOT UPDATING");
}
}

return !toRemove.empty() || !toProcess.empty();
}

//
Expand Down
36 changes: 34 additions & 2 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,40 @@ class CRendezvousQueue
/// Stop connecting if TTL expires. Resend handshake request every 250 ms if no response from the peer.
/// @param rst result of reading from a UDP socket: received packet / nothin read / read error.
/// @param cst target status for pending connection: reject or proceed.
/// @param response packet received from the UDP socket.
void updateConnStatus(EReadStatus rst, EConnectStatus cst, const CPacket& response);
/// @param pktIn packet received from the UDP socket.
void updateConnStatus(EReadStatus rst, EConnectStatus cst, const CPacket& pktIn);

private:
struct LinkStatusInfo
{
CUDT* u;
SRTSOCKET id;
int errorcode;
sockaddr_any peeraddr;
int token;

struct HasID
{
SRTSOCKET id;
HasID(SRTSOCKET p) : id(p) {}
bool operator()(const LinkStatusInfo& i)
{
return i.id == id;
}
};
};

/// @brief Qualify pending connections:
/// - Sockets with expired TTL go to the 'to_remove' list and removed from the queue straight away.
/// - If HS request is to be resent (resend 250 ms if no response from the peer) go to the 'to_process' list.
///
/// @param rst result of reading from a UDP socket: received packet / nothin read / read error.
/// @param cst target status for pending connection: reject or proceed.
/// @param iDstSockID destination socket ID of the received packet.
/// @param[in,out] toRemove stores sockets with expired TTL.
/// @param[in,out] toProcess stores sockets which should repeat (resend) HS connection request.
bool qualifyToHandle(EReadStatus rst, EConnectStatus cst, int iDstSockID,
std::vector<LinkStatusInfo>& toRemove, std::vector<LinkStatusInfo>& toProcess);

private:
struct CRL
Expand Down

0 comments on commit 917a715

Please sign in to comment.