Skip to content

Commit

Permalink
[core] Reduced nesting of checkBrokenSockets()
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Oct 19, 2021
1 parent c1fdb61 commit 3f2945c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 58 deletions.
106 changes: 49 additions & 57 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2572,10 +2572,6 @@ void srt::CUDTUnited::checkBrokenSockets()
{
ScopedLock cg(m_GlobControlLock);

// set of sockets To Be Closed and To Be Removed
vector<SRTSOCKET> tbc;
vector<SRTSOCKET> tbr;

#if ENABLE_EXPERIMENTAL_BONDING
vector<SRTSOCKET> delgids;

Expand All @@ -2600,74 +2596,70 @@ void srt::CUDTUnited::checkBrokenSockets()
{
m_ClosedGroups.erase(*di);
}

#endif

for (sockets_t::iterator i = m_Sockets.begin();
i != m_Sockets.end(); ++ i)
// set of sockets To Be Closed and To Be Removed
vector<SRTSOCKET> tbc;
vector<SRTSOCKET> tbr;

for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)
{
CUDTSocket* s = i->second;
CUDTSocket* s = i->second;
if (!s->core().m_bBroken)
continue;

// check broken connection
if (s->core().m_bBroken)
if (s->m_Status == SRTS_LISTENING)
{
if (s->m_Status == SRTS_LISTENING)
{
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
// for a listening socket, it should wait an extra 3 seconds
// in case a client is connecting
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
// A listening socket should wait an extra 3 seconds
// in case a client is connecting.
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
continue;
}
else if ((s->core().m_pRcvBuffer != NULL)
// FIXED: calling isRcvDataAvailable() just to get the information
// whether there are any data waiting in the buffer,
// NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
// this function is called (isRcvDataReady also checks if the
// available data is "ready to play").
&& s->core().m_pRcvBuffer->isRcvDataAvailable())
{
const int bc = s->core().m_iBrokenCounter.load();
if (bc > 0)
{
// if there is still data in the receiver buffer, wait longer
s->core().m_iBrokenCounter.store(bc - 1);
continue;
}
}
else if ((s->core().m_pRcvBuffer != NULL)
// FIXED: calling isRcvDataAvailable() just to get the information
// whether there are any data waiting in the buffer,
// NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
// this function is called (isRcvDataReady also checks if the
// available data is "ready to play").
&& s->core().m_pRcvBuffer->isRcvDataAvailable())
{
const int bc = s->core().m_iBrokenCounter.load();
if (bc > 0)
{
// HLOGF(smlog.Debug, "STILL KEEPING socket (still have data):
// %d\n", i->first);
// if there is still data in the receiver buffer, wait longer
s->core().m_iBrokenCounter.store(bc - 1);
continue;
}
}
}

#if ENABLE_EXPERIMENTAL_BONDING
if (s->m_GroupOf)
{
LOGC(smlog.Note, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
s->removeFromGroup(true);
}
if (s->m_GroupOf)
{
LOGC(smlog.Note, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
s->removeFromGroup(true);
}
#endif

HLOGC(smlog.Debug, log << "checkBrokenSockets: moving BROKEN socket to CLOSED: @" << i->first);
HLOGC(smlog.Debug, log << "checkBrokenSockets: moving BROKEN socket to CLOSED: @" << i->first);

//close broken connections and start removal timer
s->setClosed();
tbc.push_back(i->first);
m_ClosedSockets[i->first] = s;
//close broken connections and start removal timer
s->setClosed();
tbc.push_back(i->first);
m_ClosedSockets[i->first] = s;

// remove from listener's queue
sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
if (ls == m_Sockets.end())
{
ls = m_ClosedSockets.find(s->m_ListenSocket);
if (ls == m_ClosedSockets.end())
continue;
}

enterCS(ls->second->m_AcceptLock);
ls->second->m_QueuedSockets.erase(s->m_SocketID);
leaveCS(ls->second->m_AcceptLock);
// remove from listener's queue
sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
if (ls == m_Sockets.end())
{
ls = m_ClosedSockets.find(s->m_ListenSocket);
if (ls == m_ClosedSockets.end())
continue;
}

enterCS(ls->second->m_AcceptLock);
ls->second->m_QueuedSockets.erase(s->m_SocketID);
leaveCS(ls->second->m_AcceptLock);
}

for (sockets_t::iterator j = m_ClosedSockets.begin();
Expand Down
3 changes: 2 additions & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,8 @@ class CUDT
sync::atomic<bool> m_bPeerHealth; // If the peer status is normal
sync::atomic<int> m_RejectReason;
bool m_bOpened; // If the UDT entity has been opened
sync::atomic<int> m_iBrokenCounter; // A counter (number of GC checks) to let the GC tag this socket as disconnected
// A counter (number of GC checks happening every 1s) to let the GC tag this socket as closed.
sync::atomic<int> m_iBrokenCounter; // If a broken socket still has data in the receiver buffer, it is not marked closed until the counter is 0.

int m_iEXPCount; // Expiration counter
sync::atomic<int> m_iBandwidth; // Estimated bandwidth, number of packets per second
Expand Down

0 comments on commit 3f2945c

Please sign in to comment.