Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixed group recv read-ready check. #2938

Merged
merged 3 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5736,14 +5736,6 @@ void srt::CUDT::rewriteHandshakeData(const sockaddr_any& peer, CHandShake& w_hs)
void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer, const CPacket& hspkt, CHandShake& w_hs)
{
HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: setting up data according to handshake");
#if ENABLE_BONDING
// Keep the group alive for the lifetime of this function,
// and do it BEFORE acquiring m_ConnectionLock to avoid
// lock inversion.
// This will check if a socket belongs to a group and if so
// it will remember this group and keep it alive here.
CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent);
#endif

ScopedLock cg(m_ConnectionLock);

Expand Down Expand Up @@ -5831,6 +5823,16 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
throw CUDTException(MJ_SETUP, MN_REJECTED, 0);
}

#if ENABLE_BONDING
// The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called.
// Keep the group alive for the lifetime of this function,
// and do it BEFORE acquiring m_ConnectionLock to avoid
// lock inversion.
// This will check if a socket belongs to a group and if so
// it will remember this group and keep it alive here.
CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent);
#endif

if (!prepareBuffers(NULL))
{
HLOGC(cnlog.Debug,
Expand Down Expand Up @@ -8695,16 +8697,15 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
#if ENABLE_BONDING
if (drift_updated && m_parent->m_GroupOf)
{
ScopedLock glock(uglobal().m_GlobControlLock);
if (m_parent->m_GroupOf)
{
m_parent->m_GroupOf->synchronizeDrift(this);
}
}
ScopedLock glock(uglobal().m_GlobControlLock);
const bool drift_updated =
#endif
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);

#if ENABLE_BONDING
if (drift_updated)
m_parent->m_GroupOf->synchronizeDrift(this);
#endif
}

Expand Down
19 changes: 15 additions & 4 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2049,10 +2049,14 @@ vector<CUDTSocket*> CUDTGroup::recv_WaitForReadReady(const vector<CUDTSocket*>&
}
else
{
// No read-readiness reported by epoll, but probably missed or not yet handled
// as the receiver buffer is read-ready.
// No read-readiness reported by epoll, but can be missed or not yet handled
// while the receiver buffer is in fact read-ready.
ScopedLock lg(sock->core().m_RcvBufferLock);
if (sock->core().m_pRcvBuffer && sock->core().m_pRcvBuffer->isRcvDataReady())
if (!sock->core().m_pRcvBuffer)
continue;
// Checking for the next packet in the RCV buffer is safer that isReadReady(tnow).
const CRcvBuffer::PacketInfo info = sock->core().m_pRcvBuffer->getFirstValidPacketInfo();
if (info.seqno != SRT_SEQNO_NONE && !info.seq_gap)
readReady.push_back(sock);
}
}
Expand Down Expand Up @@ -2222,6 +2226,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
}

// Find the first readable packet among all member sockets.
steady_clock::time_point tnow = steady_clock::now();
CUDTSocket* socketToRead = NULL;
CRcvBuffer::PacketInfo infoToRead = {-1, false, time_point()};
for (vector<CUDTSocket*>::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si)
Expand All @@ -2242,7 +2247,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
}

const CRcvBuffer::PacketInfo info =
ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now());
ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(tnow);
if (info.seqno == SRT_SEQNO_NONE)
{
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read.");
Expand All @@ -2262,6 +2267,12 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
{
socketToRead = ps;
infoToRead = info;

if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && ((CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) == 1))
{
// We have the next packet. No need to check other read-ready sockets.
break;
}
}
}

Expand Down
Loading