Skip to content

Commit

Permalink
[core] Moved post-hs-update locking code out of mutex (#1677).
Browse files Browse the repository at this point in the history
Restored proper protection of RIDVector. Removed lock prevention on asynchro update.
  • Loading branch information
ethouris authored Nov 30, 2020
1 parent 5bc58cd commit 03dafd8
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 105 deletions.
5 changes: 0 additions & 5 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1896,13 +1896,8 @@ int CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_
*/
try
{
// InvertedGuard unlocks in the constructor, then locks in the
// destructor, no matter if an exception has fired.
InvertedLock l_unlocker (s->m_pUDT->m_bSynRecving ? &s->m_ControlLock : 0);

// record peer address
s->m_PeerAddr = target_addr;

s->m_pUDT->startConnect(target_addr, forced_isn);
}
catch (CUDTException& e) // Interceptor, just to change the state.
Expand Down
32 changes: 18 additions & 14 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4074,7 +4074,7 @@ void CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn)
if (m_pRcvQueue->recvfrom(m_SocketID, (response)) > 0)
{
HLOGC(cnlog.Debug, log << CONID() << "startConnect: got response for connect request");
cst = processConnectResponse(response, &e, COM_SYNCHRO);
cst = processConnectResponse(response, &e);

HLOGC(cnlog.Debug, log << CONID() << "startConnect: response processing result: " << ConnectStatusStr(cst));

Expand Down Expand Up @@ -4107,7 +4107,7 @@ void CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn)
// it means that it has done all that was required, however none of the below
// things has to be done (this function will do it by itself if needed).
// Otherwise the handshake rolling can be interrupted and considered complete.
cst = processRendezvous(response, serv_addr, true /*synchro*/, RST_OK, (reqpkt));
cst = processRendezvous(response, serv_addr, RST_OK, (reqpkt));
if (cst == CONN_CONTINUE)
continue;
break;
Expand Down Expand Up @@ -4237,9 +4237,9 @@ EConnectStatus CUDT::processAsyncConnectResponse(const CPacket &pkt) ATR_NOEXCEP
EConnectStatus cst = CONN_CONTINUE;
CUDTException e;

ScopedLock cg(m_ConnectionLock); // FIX
ScopedLock cg(m_ConnectionLock);
HLOGC(cnlog.Debug, log << CONID() << "processAsyncConnectResponse: got response for connect request, processing");
cst = processConnectResponse(pkt, &e, COM_ASYNCHRO);
cst = processConnectResponse(pkt, &e);

HLOGC(cnlog.Debug,
log << CONID() << "processAsyncConnectResponse: response processing result: " << ConnectStatusStr(cst)
Expand Down Expand Up @@ -4275,10 +4275,12 @@ bool CUDT::processAsyncConnectRequest(EReadStatus rst,

bool status = true;

ScopedLock cg(m_ConnectionLock);

if (cst == CONN_RENDEZVOUS)
{
HLOGC(cnlog.Debug, log << "processAsyncConnectRequest: passing to processRendezvous");
cst = processRendezvous(response, serv_addr, false /*asynchro*/, rst, (request));
cst = processRendezvous(response, serv_addr, rst, (request));
if (cst == CONN_ACCEPT)
{
HLOGC(cnlog.Debug,
Expand Down Expand Up @@ -4389,7 +4391,7 @@ void CUDT::cookieContest()

EConnectStatus CUDT::processRendezvous(
const CPacket& response, const sockaddr_any& serv_addr,
bool synchro, EReadStatus rst, CPacket& w_reqpkt)
EReadStatus rst, CPacket& w_reqpkt)
{
if (m_RdvState == CHandShake::RDV_CONNECTED)
{
Expand Down Expand Up @@ -4638,7 +4640,7 @@ EConnectStatus CUDT::processRendezvous(
// When synchro=false, don't lock a mutex for rendezvous queue.
// This is required when this function is called in the
// receive queue worker thread - it would lock itself.
int cst = postConnect(response, true, 0, synchro);
int cst = postConnect(response, true, 0);
if (cst == CONN_REJECT)
{
// m_RejectReason already set
Expand Down Expand Up @@ -4714,7 +4716,8 @@ EConnectStatus CUDT::processRendezvous(
return CONN_CONTINUE;
}

EConnectStatus CUDT::processConnectResponse(const CPacket& response, CUDTException* eout, EConnectMethod synchro) ATR_NOEXCEPT
// [[using locked(m_ConnectionLock)]];
EConnectStatus CUDT::processConnectResponse(const CPacket& response, CUDTException* eout) ATR_NOEXCEPT
{
// NOTE: ASSUMED LOCK ON: m_ConnectionLock.

Expand Down Expand Up @@ -4773,7 +4776,7 @@ EConnectStatus CUDT::processConnectResponse(const CPacket& response, CUDTExcepti
m_RdvState = CHandShake::RDV_CONNECTED;
}

return postConnect(response, hsv5, eout, synchro);
return postConnect(response, hsv5, eout);
}

if (!response.isControl(UMSG_HANDSHAKE))
Expand Down Expand Up @@ -4943,7 +4946,7 @@ EConnectStatus CUDT::processConnectResponse(const CPacket& response, CUDTExcepti
}
}

return postConnect(response, false, eout, synchro);
return postConnect(response, false, eout);
}

void CUDT::applyResponseSettings() ATR_NOEXCEPT
Expand All @@ -4967,7 +4970,7 @@ void CUDT::applyResponseSettings() ATR_NOEXCEPT
<< " peerID=" << m_ConnRes.m_iID);
}

EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTException *eout, bool synchro) ATR_NOEXCEPT
EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTException *eout) ATR_NOEXCEPT
{
if (m_ConnRes.m_iVersion < HS_VERSION_SRT1)
m_tsRcvPeerStartTime = steady_clock::time_point(); // will be set correctly in SRT HS.
Expand Down Expand Up @@ -5088,7 +5091,7 @@ EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTE
// because otherwise the packets that are coming for this socket before the
// connection process is complete will be rejected as "attack", instead of
// being enqueued for later pickup from the queue.
m_pRcvQueue->removeConnector(m_SocketID, synchro);
m_pRcvQueue->removeConnector(m_SocketID);

// Ok, no more things to be done as per "clear connecting state"
if (!s)
Expand Down Expand Up @@ -8100,7 +8103,7 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)
{
const int32_t first_seq ATR_UNUSED = ackDataUpTo(ack);
bufflock.unlock();
InvertedLock un_bufflock (m_RcvBufferLock);

#if ENABLE_EXPERIMENTAL_BONDING
// This actually should be done immediately after the ACK pointers were
Expand Down Expand Up @@ -8178,7 +8181,6 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
#endif
CGlobEvent::triggerEvent();
}
bufflock.lock();
}
else if (ack == m_iRcvLastAck)
{
Expand Down Expand Up @@ -10608,6 +10610,8 @@ int32_t CUDT::bake(const sockaddr_any& addr, int32_t current_cookie, int correct
//
// XXX Make this function return EConnectStatus enum type (extend if needed),
// and this will be directly passed to the caller.

// [[using locked(m_pRcvQueue->m_LSLock)]];
int CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet)
{
// XXX ASSUMPTIONS:
Expand Down
8 changes: 3 additions & 5 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ class CUDT
/// @retval 1 Connection in progress (m_ConnReq turned into RESPONSE)
/// @retval -1 Connection failed

SRT_ATR_NODISCARD EConnectStatus processConnectResponse(const CPacket& pkt, CUDTException* eout, EConnectMethod synchro) ATR_NOEXCEPT;
SRT_ATR_NODISCARD EConnectStatus processConnectResponse(const CPacket& pkt, CUDTException* eout) ATR_NOEXCEPT;

// This function works in case of HSv5 rendezvous. It changes the state
// according to the present state and received message type, as well as the
Expand All @@ -542,12 +542,10 @@ class CUDT
/// @param reqpkt Packet to be written with handshake data
/// @param response incoming handshake response packet to be interpreted
/// @param serv_addr incoming packet's address
/// @param synchro True when this function was called in blocking mode
/// @param rst Current read status to know if the HS packet was freshly received from the peer, or this is only a periodic update (RST_AGAIN)
SRT_ATR_NODISCARD EConnectStatus processRendezvous(const CPacket &response, const sockaddr_any& serv_addr, bool synchro, EReadStatus,
CPacket& reqpkt);
SRT_ATR_NODISCARD EConnectStatus processRendezvous(const CPacket &response, const sockaddr_any& serv_addr, EReadStatus, CPacket& reqpkt);
SRT_ATR_NODISCARD bool prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUDTException *eout);
SRT_ATR_NODISCARD EConnectStatus postConnect(const CPacket& response, bool rendezvous, CUDTException* eout, bool synchro) ATR_NOEXCEPT;
SRT_ATR_NODISCARD EConnectStatus postConnect(const CPacket& response, bool rendezvous, CUDTException* eout) ATR_NOEXCEPT;
void applyResponseSettings() ATR_NOEXCEPT;
SRT_ATR_NODISCARD EConnectStatus processAsyncConnectResponse(const CPacket& pkt) ATR_NOEXCEPT;
SRT_ATR_NODISCARD bool processAsyncConnectRequest(EReadStatus rst, EConnectStatus cst, const CPacket& response, const sockaddr_any& serv_addr);
Expand Down
Loading

0 comments on commit 03dafd8

Please sign in to comment.