From 9c0f352ddaca9228dd7aa3e97e38a768dc3620c7 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 22 Apr 2024 18:25:26 +0200 Subject: [PATCH 1/3] [core] Fixed group recv read-ready check. --- srtcore/group.cpp | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 282e8f25a..c0c17868e 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -2049,10 +2049,14 @@ vector CUDTGroup::recv_WaitForReadReady(const vector& } else { - // No read-readiness reported by epoll, but probably missed or not yet handled - // as the receiver buffer is read-ready. + // No read-readiness reported by epoll, but can be missed or not yet handled + // while the receiver buffer is in fact read-ready. ScopedLock lg(sock->core().m_RcvBufferLock); - if (sock->core().m_pRcvBuffer && sock->core().m_pRcvBuffer->isRcvDataReady()) + if (!sock->core().m_pRcvBuffer) + continue; + // Checking for the next packet in the RCV buffer is safer that isReadReady(tnow). + const CRcvBuffer::PacketInfo info = sock->core().m_pRcvBuffer->getFirstValidPacketInfo(); + if (info.seqno != SRT_SEQNO_NONE && !info.seq_gap) readReady.push_back(sock); } } @@ -2222,6 +2226,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } // Find the first readable packet among all member sockets. + steady_clock::time_point tnow = steady_clock::now(); CUDTSocket* socketToRead = NULL; CRcvBuffer::PacketInfo infoToRead = {-1, false, time_point()}; for (vector::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si) @@ -2242,7 +2247,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } const CRcvBuffer::PacketInfo info = - ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now()); + ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(tnow); if (info.seqno == SRT_SEQNO_NONE) { HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read."); @@ -2262,6 +2267,12 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) { socketToRead = ps; infoToRead = info; + + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && ((CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) == 1)) + { + // We have the next packet. No need to check other read-ready sockets. + break; + } } } From d488c00d1476175a67d1b15b82707f50eaefc8cf Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 25 Apr 2024 12:06:04 +0200 Subject: [PATCH 2/3] [core] Fixed time base sync in a group. --- srtcore/core.cpp | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index d12628220..b7c2eccce 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -8695,16 +8695,15 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr // srt_recvfile (which doesn't make any sense), you'll have a deadlock. if (m_config.bDriftTracer) { - const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); #if ENABLE_BONDING - if (drift_updated && m_parent->m_GroupOf) - { - ScopedLock glock(uglobal().m_GlobControlLock); - if (m_parent->m_GroupOf) - { - m_parent->m_GroupOf->synchronizeDrift(this); - } - } + ScopedLock glock(uglobal().m_GlobControlLock); + const bool drift_updated = +#endif + m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); + +#if ENABLE_BONDING + if (drift_updated) + m_parent->m_GroupOf->synchronizeDrift(this); #endif } From 6f7caf58a3835363dbac762543c3e309a974b2c3 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 26 Apr 2024 11:44:42 +0200 Subject: [PATCH 3/3] [core] Fixed group synchronization of accepted sockets. Fixes #2941. --- srtcore/core.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index b7c2eccce..5740412d8 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5736,14 +5736,6 @@ void srt::CUDT::rewriteHandshakeData(const sockaddr_any& peer, CHandShake& w_hs) void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer, const CPacket& hspkt, CHandShake& w_hs) { HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: setting up data according to handshake"); -#if ENABLE_BONDING - // Keep the group alive for the lifetime of this function, - // and do it BEFORE acquiring m_ConnectionLock to avoid - // lock inversion. - // This will check if a socket belongs to a group and if so - // it will remember this group and keep it alive here. - CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent); -#endif ScopedLock cg(m_ConnectionLock); @@ -5831,6 +5823,16 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& throw CUDTException(MJ_SETUP, MN_REJECTED, 0); } +#if ENABLE_BONDING + // The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called. + // Keep the group alive for the lifetime of this function, + // and do it BEFORE acquiring m_ConnectionLock to avoid + // lock inversion. + // This will check if a socket belongs to a group and if so + // it will remember this group and keep it alive here. + CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent); +#endif + if (!prepareBuffers(NULL)) { HLOGC(cnlog.Debug,