diff --git a/srtcore/api.cpp b/srtcore/api.cpp index e64c60f29..bb5dd64fe 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1914,11 +1914,28 @@ int srt::CUDTUnited::close(const SRTSOCKET u) return 0; } #endif - CUDTSocket* s = locateSocket(u); - if (!s) - throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0); +#if ENABLE_HEAVY_LOGGING + // Wrapping the log into a destructor so that it + // is printed AFTER the destructor of SocketKeeper. + struct ScopedExitLog + { + const CUDTSocket* const ps; + ScopedExitLog(const CUDTSocket* p): ps(p){} + ~ScopedExitLog() + { + if (ps) // Could be not acquired by SocketKeeper, occasionally + { + HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); + } + } + }; +#endif + + SocketKeeper k(*this, u, ERH_THROW); + IF_HEAVY_LOGGING(ScopedExitLog slog(k.socket)); + HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy()); - return close(s); + return close(k.socket); } #if ENABLE_BONDING @@ -2547,6 +2564,45 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s) } #endif +srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh) +{ + ScopedLock cg(m_GlobControlLock); + + CUDTSocket* s = locateSocket_LOCKED(u); + if (!s) + { + if (erh == ERH_THROW) + throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0); + return NULL; + } + + s->apiAcquire(); + return s; +} + +bool srt::CUDTUnited::acquireSocket(CUDTSocket* s) +{ + // Note that before using this function you must be certain + // that the socket isn't broken already and it still has at least + // one more GC cycle to live. In other words, you must be certain + // that this pointer passed here isn't dangling and was obtained + // directly from m_Sockets, or even better, has been acquired + // by some other functionality already, which is only about to + // be released earlier than you need. + ScopedLock cg(m_GlobControlLock); + s->apiAcquire(); + // Keep the lock so that no one changes anything in the meantime. + // If the socket m_Status == SRTS_CLOSED (set by setClosed()), then + // this socket is no longer present in the m_Sockets container + if (s->m_Status >= SRTS_BROKEN) + { + s->apiRelease(); + return false; + } + + return true; +} + srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn) { ScopedLock cg(m_GlobControlLock); @@ -2613,7 +2669,7 @@ void srt::CUDTUnited::checkBrokenSockets() if (s->m_Status == SRTS_LISTENING) { - const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp; + const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp.load(); // A listening socket should wait an extra 3 seconds // in case a client is connecting. if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS)) @@ -2672,6 +2728,20 @@ void srt::CUDTUnited::checkBrokenSockets() for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j) { CUDTSocket* ps = j->second; + + // NOTE: There is still a hypothetical risk here that ps + // was made busy while the socket was already moved to m_ClosedSocket, + // if the socket was acquired through CUDTUnited::acquireSocket (that is, + // busy flag acquisition was done through the CUDTSocket* pointer rather + // than through the numeric ID). Therefore this way of busy acquisition + // should be done only if at the moment of acquisition there are certainly + // other conditions applying on the socket that prevent it from being deleted. + if (ps->isStillBusy()) + { + HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE."); + continue; + } + CUDT& u = ps->core(); // HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first); @@ -2691,7 +2761,7 @@ void srt::CUDTUnited::checkBrokenSockets() // timeout 1 second to destroy a socket AND it has been removed from // RcvUList const steady_clock::time_point now = steady_clock::now(); - const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp; + const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp.load(); if (closed_ago > seconds_from(1)) { CRNode* rnode = u.m_pRNode; @@ -2741,6 +2811,14 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) if (rn && rn->m_bOnList) return; + if (s->isStillBusy()) + { + HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " is still busy, NOT deleting"); + return; + } + + LOGC(smlog.Note, log << "@" << s->m_SocketID << " busy=" << s->isStillBusy()); + #if ENABLE_BONDING if (s->m_GroupOf) { diff --git a/srtcore/api.h b/srtcore/api.h index 6dbad6634..b16784bbd 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -123,6 +123,18 @@ class CUDTSocket void construct(); +private: + srt::sync::atomic m_iBusy; +public: + void apiAcquire() { ++m_iBusy; } + void apiRelease() { --m_iBusy; } + + int isStillBusy() const + { + return m_iBusy; + } + + SRT_ATTR_GUARDED_BY(m_ControlLock) sync::atomic m_Status; //< current socket state @@ -131,7 +143,8 @@ class CUDTSocket /// of sockets in order to prevent other methods from accessing invalid address. /// A timer is started and the socket will be removed after approximately /// 1 second (see CUDTUnited::checkBrokenSockets()). - sync::steady_clock::time_point m_tsClosureTimeStamp; + //sync::steady_clock::time_point m_tsClosureTimeStamp; + sync::AtomicClock m_tsClosureTimeStamp; sockaddr_any m_SelfAddr; //< local address of the socket sockaddr_any m_PeerAddr; //< peer address of the socket @@ -324,7 +337,7 @@ class CUDTUnited int epoll_release(const int eid); #if ENABLE_BONDING - // [[using locked(m_GlobControlLock)]] + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock) CUDTGroup& addGroup(SRTSOCKET id, SRT_GROUP_TYPE type) { // This only ensures that the element exists. @@ -346,7 +359,7 @@ class CUDTUnited void deleteGroup(CUDTGroup* g); void deleteGroup_LOCKED(CUDTGroup* g); - // [[using locked(m_GlobControlLock)]] + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock) CUDTGroup* findPeerGroup_LOCKED(SRTSOCKET peergroup) { for (groups_t::iterator i = m_Groups.begin(); i != m_Groups.end(); ++i) @@ -445,8 +458,50 @@ class CUDTUnited } } }; - #endif + + CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN); + bool acquireSocket(CUDTSocket* s); + +public: + struct SocketKeeper + { + CUDTSocket* socket; + + SocketKeeper(): socket(NULL) {} + + // This is intended for API functions to lock the socket's existence + // for the lifetime of their call. + SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); } + + // This is intended for TSBPD thread that should lock the socket's + // existence until it exits. + SocketKeeper(CUDTUnited& glob, CUDTSocket* s) + { + acquire(glob, s); + } + + // Note: acquire doesn't check if the keeper already keeps anything. + // This is only for a use together with an empty constructor. + bool acquire(CUDTUnited& glob, CUDTSocket* s) + { + const bool caught = glob.acquireSocket(s); + socket = caught ? s : NULL; + return caught; + } + + ~SocketKeeper() + { + if (socket) + { + SRT_ASSERT(socket->isStillBusy() > 0); + socket->apiRelease(); + } + } + }; + +private: + void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL); bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index bae245392..eca2b2069 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5844,6 +5844,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& } #if ENABLE_BONDING + m_ConnectionLock.unlock(); // The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called. // Keep the group alive for the lifetime of this function, // and do it BEFORE acquiring m_ConnectionLock to avoid @@ -5851,6 +5852,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& // This will check if a socket belongs to a group and if so // it will remember this group and keep it alive here. CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent); + m_ConnectionLock.lock(); #endif if (!prepareBuffers(NULL)) @@ -8759,7 +8761,7 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr if (m_config.bDriftTracer) { #if ENABLE_BONDING - ScopedLock glock(uglobal().m_GlobControlLock); + ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive? const bool drift_updated = #endif m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); @@ -11737,17 +11739,20 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode) // Bound to one call because this requires locking pg->updateFailedLink(); } + // Sockets that never succeeded to connect must be deleted + // explicitly, otherwise they will never be deleted. OTOH + // the socket can be on the path of deletion already, so + // this only makes sure that the socket will be deleted, + // one way or another. + if (pending_broken) + { + // XXX This somehow can cause a deadlock + // uglobal()->close(m_parent); + LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group."); + m_parent->setBrokenClosed(); + } } - // Sockets that never succeeded to connect must be deleted - // explicitly, otherwise they will never be deleted. - if (pending_broken) - { - // XXX This somehow can cause a deadlock - // uglobal()->close(m_parent); - LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group."); - m_parent->setBrokenClosed(); - } #endif } diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 31a1460fd..6cb4faeb1 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -574,6 +574,13 @@ void* srt::CSndQueue::worker(void* param) continue; } + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->id()); + if (!sk.socket) + { + HLOGC(qslog.Debug, log << "Socket to be processed was deleted in the meantime, not packing"); + continue; + } + // pack a packet from the socket CPacket pkt; steady_clock::time_point next_send_time; @@ -929,6 +936,16 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst EReadStatus read_st = rst; EConnectStatus conn_st = cst; + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id); + if (!sk.socket) + { + // Socket deleted already, so stop this and proceed to the next loop. + LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, proceed to only removal from lists"); + toRemove.push_back(*i); + continue; + } + + if (cst != CONN_RENDEZVOUS && dest_id != 0) { if (i->id != dest_id) @@ -974,14 +991,22 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst for (vector::iterator i = toRemove.begin(); i != toRemove.end(); ++i) { HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id); - // + remove(i->id); + + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id); + if (!sk.socket) + { + // This actually shall never happen, so it's a kind of paranoid check. + LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents"); + continue; + } + // Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue // because the next CUDT::close will not remove it from the queue when m_bConnecting = false, // and may crash on next pass. // // TODO: maybe lock i->u->m_ConnectionLock? i->u->m_bConnecting = false; - remove(i->u->m_SocketID); // DO NOT close the socket here because in this case it might be // unable to get status from at the right moment. Also only member @@ -992,6 +1017,11 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst CUDT::uglobal().m_EPoll.update_events( i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); + // Make sure that the socket wasn't deleted in the meantime. + // Skip this part if it was. Note also that if the socket was + // decided to be deleted, it's already moved to m_ClosedSockets + // and should have been therefore already processed for deletion. + i->u->completeBrokenConnectionDependencies(i->errorcode); } @@ -1442,6 +1472,12 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessAddressedPacket(int32_t id, CU HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: resending to QUEUED socket @" << id); return worker_TryAsyncRend_OrStore(id, unit, addr); } + // Although we donĀ“t have an exclusive passing here, + // we can count on that when the socket was once present in the hash, + // it will not be deleted for at least one GC cycle. But we still need + // to maintain the object existence until it's in use. + // Note that here we are out of any locks, so m_GlobControlLock can be locked. + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->m_parent); // Found associated CUDT - process this as control or data packet // addressed to an associated socket. diff --git a/test/test_fec_rebuilding.cpp b/test/test_fec_rebuilding.cpp index 185b54e98..ac57e5265 100644 --- a/test/test_fec_rebuilding.cpp +++ b/test/test_fec_rebuilding.cpp @@ -544,7 +544,7 @@ TEST(TestFEC, ConnectionMess) SRTSOCKET la[] = { l }; SRTSOCKET a = srt_accept_bond(la, 1, 2000); - ASSERT_NE(a, SRT_ERROR); + ASSERT_NE(a, SRT_ERROR) << srt_getlasterror_str(); EXPECT_EQ(connect_res.get(), SRT_SUCCESS); // Now that the connection is established, check negotiated config