From 8befa35dad4b6c721f539b26558b96bd2c2495d4 Mon Sep 17 00:00:00 2001 From: Zhao Zhili Date: Thu, 1 Apr 2021 17:38:23 +0800 Subject: [PATCH] Replace global CUDTUnited by static local variable Static local variable has the benefits of lazy initialization and avoids the initialization order problem of global variables. --- srtcore/api.cpp | 136 +++++++++++++++++++++++----------------------- srtcore/core.cpp | 117 ++++++++++++++++++++------------------- srtcore/core.h | 5 +- srtcore/queue.cpp | 2 +- 4 files changed, 131 insertions(+), 129 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 3b8f81ef19..3ba4116cb5 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -862,7 +862,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer, // static forwarder int CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq) { - return s_UDTUnited.installAcceptHook(lsn, hook, opaq); + return uglobal()->installAcceptHook(lsn, hook, opaq); } int CUDTUnited::installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq) @@ -883,7 +883,7 @@ int CUDTUnited::installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* h int CUDT::installConnectHook(SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq) { - return s_UDTUnited.installConnectHook(lsn, hook, opaq); + return uglobal()->installConnectHook(lsn, hook, opaq); } int CUDTUnited::installConnectHook(const SRTSOCKET u, srt_connect_callback_fn* hook, void* opaq) @@ -3102,22 +3102,22 @@ void* CUDTUnited::garbageCollect(void* p) int CUDT::startup() { - return s_UDTUnited.startup(); + return uglobal()->startup(); } int CUDT::cleanup() { - return s_UDTUnited.cleanup(); + return uglobal()->cleanup(); } SRTSOCKET CUDT::socket() { - if (!s_UDTUnited.m_bGCStatus) - s_UDTUnited.startup(); + if (!uglobal()->m_bGCStatus) + uglobal()->startup(); try { - return s_UDTUnited.newSocket(); + return uglobal()->newSocket(); } catch (const CUDTException& e) { @@ -3153,24 +3153,24 @@ CUDT::APIError::APIError(CodeMajor mj, CodeMinor mn, int syserr) // This is an internal function; 'type' should be pre-checked if it has a correct value. // This doesn't have argument of GroupType due to header file conflicts. -// [[using locked(s_UDTUnited.m_GlobControlLock)]] +// [[using locked(uglobal()->m_GlobControlLock)]] CUDTGroup& CUDT::newGroup(const int type) { - const SRTSOCKET id = s_UDTUnited.generateSocketID(true); + const SRTSOCKET id = uglobal()->generateSocketID(true); // Now map the group - return s_UDTUnited.addGroup(id, SRT_GROUP_TYPE(type)).set_id(id); + return uglobal()->addGroup(id, SRT_GROUP_TYPE(type)).set_id(id); } SRTSOCKET CUDT::createGroup(SRT_GROUP_TYPE gt) { // Doing the same lazy-startup as with srt_create_socket() - if (!s_UDTUnited.m_bGCStatus) - s_UDTUnited.startup(); + if (!uglobal()->m_bGCStatus) + uglobal()->startup(); try { - srt::sync::ScopedLock globlock (s_UDTUnited.m_GlobControlLock); + srt::sync::ScopedLock globlock (uglobal()->m_GlobControlLock); return newGroup(gt).id(); // Note: potentially, after this function exits, the group // could be deleted, immediately, from a separate thread (tho @@ -3201,8 +3201,8 @@ int CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group) return APIError(MJ_NOTSUP, MN_INVAL, 0); // Find the socket and the group - CUDTSocket* s = s_UDTUnited.locateSocket(socket); - CUDTUnited::GroupKeeper k (s_UDTUnited, group, s_UDTUnited.ERH_RETURN); + CUDTSocket* s = uglobal()->locateSocket(socket); + CUDTUnited::GroupKeeper k (s_UDTUnited, group, uglobal()->ERH_RETURN); if (!s || !k.group) return APIError(MJ_NOTSUP, MN_INVAL, 0); @@ -3223,7 +3223,7 @@ int CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group) } ScopedLock cg (s->m_ControlLock); - ScopedLock cglob (s_UDTUnited.m_GlobControlLock); + ScopedLock cglob (uglobal()->m_GlobControlLock); if (g->closing()) return APIError(MJ_NOTSUP, MN_INVAL, 0); @@ -3247,7 +3247,7 @@ int CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group) // groups. int CUDT::removeSocketFromGroup(SRTSOCKET socket) { - CUDTSocket* s = s_UDTUnited.locateSocket(socket); + CUDTSocket* s = uglobal()->locateSocket(socket); if (!s) return APIError(MJ_NOTSUP, MN_INVAL, 0); @@ -3255,13 +3255,13 @@ int CUDT::removeSocketFromGroup(SRTSOCKET socket) return APIError(MJ_NOTSUP, MN_INVAL, 0); ScopedLock cg (s->m_ControlLock); - ScopedLock glob_grd (s_UDTUnited.m_GlobControlLock); + ScopedLock glob_grd (uglobal()->m_GlobControlLock); s->removeFromGroup(false); return 0; } // [[using locked(m_ControlLock)]] -// [[using locked(CUDT::s_UDTUnited.m_GlobControlLock)]] +// [[using locked(CUDT::uglobal()->m_GlobControlLock)]] void CUDTSocket::removeFromGroup(bool broken) { CUDTGroup* g = m_GroupOf; @@ -3295,8 +3295,8 @@ SRTSOCKET CUDT::getGroupOfSocket(SRTSOCKET socket) { // Lock this for the whole function as we need the group // to persist the call. - ScopedLock glock (s_UDTUnited.m_GlobControlLock); - CUDTSocket* s = s_UDTUnited.locateSocket_LOCKED(socket); + ScopedLock glock (uglobal()->m_GlobControlLock); + CUDTSocket* s = uglobal()->locateSocket_LOCKED(socket); if (!s || !s->m_GroupOf) return APIError(MJ_NOTSUP, MN_INVAL, 0); @@ -3310,7 +3310,7 @@ int CUDT::configureGroup(SRTSOCKET groupid, const char* str) return APIError(MJ_NOTSUP, MN_INVAL, 0); } - CUDTUnited::GroupKeeper k (s_UDTUnited, groupid, s_UDTUnited.ERH_RETURN); + CUDTUnited::GroupKeeper k (s_UDTUnited, groupid, uglobal()->ERH_RETURN); if (!k.group) { return APIError(MJ_NOTSUP, MN_INVAL, 0); @@ -3326,7 +3326,7 @@ int CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t* psiz return APIError(MJ_NOTSUP, MN_INVAL, 0); } - CUDTUnited::GroupKeeper k (s_UDTUnited, groupid, s_UDTUnited.ERH_RETURN); + CUDTUnited::GroupKeeper k (s_UDTUnited, groupid, uglobal()->ERH_RETURN); if (!k.group) { return APIError(MJ_NOTSUP, MN_INVAL, 0); @@ -3350,11 +3350,11 @@ int CUDT::bind(SRTSOCKET u, const sockaddr* name, int namelen) // This is a user error. return APIError(MJ_NOTSUP, MN_INVAL, 0); } - CUDTSocket* s = s_UDTUnited.locateSocket(u); + CUDTSocket* s = uglobal()->locateSocket(u); if (!s) return APIError(MJ_NOTSUP, MN_INVAL, 0); - return s_UDTUnited.bind(s, sa); + return uglobal()->bind(s, sa); } catch (const CUDTException& e) { @@ -3377,11 +3377,11 @@ int CUDT::bind(SRTSOCKET u, UDPSOCKET udpsock) { try { - CUDTSocket* s = s_UDTUnited.locateSocket(u); + CUDTSocket* s = uglobal()->locateSocket(u); if (!s) return APIError(MJ_NOTSUP, MN_INVAL, 0); - return s_UDTUnited.bind(s, udpsock); + return uglobal()->bind(s, udpsock); } catch (const CUDTException& e) { @@ -3403,7 +3403,7 @@ int CUDT::listen(SRTSOCKET u, int backlog) { try { - return s_UDTUnited.listen(u, backlog); + return uglobal()->listen(u, backlog); } catch (const CUDTException& e) { @@ -3425,7 +3425,7 @@ SRTSOCKET CUDT::accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msT { try { - return s_UDTUnited.accept_bond(listeners, lsize, msTimeOut); + return uglobal()->accept_bond(listeners, lsize, msTimeOut); } catch (const CUDTException& e) { @@ -3450,7 +3450,7 @@ SRTSOCKET CUDT::accept(SRTSOCKET u, sockaddr* addr, int* addrlen) { try { - return s_UDTUnited.accept(u, addr, addrlen); + return uglobal()->accept(u, addr, addrlen); } catch (const CUDTException& e) { @@ -3476,7 +3476,7 @@ int CUDT::connect( { try { - return s_UDTUnited.connect(u, name, tname, namelen); + return uglobal()->connect(u, name, tname, namelen); } catch (const CUDTException& e) { @@ -3509,8 +3509,8 @@ int CUDT::connectLinks(SRTSOCKET grp, try { - CUDTUnited::GroupKeeper k(s_UDTUnited, grp, s_UDTUnited.ERH_THROW); - return s_UDTUnited.groupConnect(k.group, targets, arraysize); + CUDTUnited::GroupKeeper k(s_UDTUnited, grp, uglobal()->ERH_THROW); + return uglobal()->groupConnect(k.group, targets, arraysize); } catch (CUDTException& e) { @@ -3534,7 +3534,7 @@ int CUDT::connect( { try { - return s_UDTUnited.connect(u, name, namelen, forced_isn); + return uglobal()->connect(u, name, namelen, forced_isn); } catch (const CUDTException &e) { @@ -3556,7 +3556,7 @@ int CUDT::close(SRTSOCKET u) { try { - return s_UDTUnited.close(u); + return uglobal()->close(u); } catch (const CUDTException& e) { @@ -3574,7 +3574,7 @@ int CUDT::getpeername(SRTSOCKET u, sockaddr* name, int* namelen) { try { - s_UDTUnited.getpeername(u, name, namelen); + uglobal()->getpeername(u, name, namelen); return 0; } catch (const CUDTException& e) @@ -3593,7 +3593,7 @@ int CUDT::getsockname(SRTSOCKET u, sockaddr* name, int* namelen) { try { - s_UDTUnited.getsockname(u, name, namelen); + uglobal()->getsockname(u, name, namelen); return 0; } catch (const CUDTException& e) @@ -3621,13 +3621,13 @@ int CUDT::getsockopt( #if ENABLE_EXPERIMENTAL_BONDING if (u & SRTGROUP_MASK) { - CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW); + CUDTUnited::GroupKeeper k(s_UDTUnited, u, uglobal()->ERH_THROW); k.group->getOpt(optname, (pw_optval), (*pw_optlen)); return 0; } #endif - CUDT* udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT; + CUDT* udt = uglobal()->locateSocket(u, uglobal()->ERH_THROW)->m_pUDT; udt->getOpt(optname, (pw_optval), (*pw_optlen)); return 0; } @@ -3653,13 +3653,13 @@ int CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* optval, #if ENABLE_EXPERIMENTAL_BONDING if (u & SRTGROUP_MASK) { - CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW); + CUDTUnited::GroupKeeper k(s_UDTUnited, u, uglobal()->ERH_THROW); k.group->setOpt(optname, optval, optlen); return 0; } #endif - CUDT* udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT; + CUDT* udt = uglobal()->locateSocket(u, uglobal()->ERH_THROW)->m_pUDT; udt->setOpt(optname, optval, optlen); return 0; } @@ -3702,12 +3702,12 @@ int CUDT::sendmsg2( #if ENABLE_EXPERIMENTAL_BONDING if (u & SRTGROUP_MASK) { - CUDTUnited::GroupKeeper k (s_UDTUnited, u, s_UDTUnited.ERH_THROW); + CUDTUnited::GroupKeeper k (s_UDTUnited, u, uglobal()->ERH_THROW); return k.group->send(buf, len, (w_m)); } #endif - return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m)); + return uglobal()->locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m)); } catch (const CUDTException& e) { @@ -3747,12 +3747,12 @@ int CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m) #if ENABLE_EXPERIMENTAL_BONDING if (u & SRTGROUP_MASK) { - CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW); + CUDTUnited::GroupKeeper k(s_UDTUnited, u, uglobal()->ERH_THROW); return k.group->recv(buf, len, (w_m)); } #endif - return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m)); + return uglobal()->locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m)); } catch (const CUDTException& e) { @@ -3771,7 +3771,7 @@ int64_t CUDT::sendfile( { try { - CUDT* udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT; + CUDT* udt = uglobal()->locateSocket(u, uglobal()->ERH_THROW)->m_pUDT; return udt->sendfile(ifs, offset, size, block); } catch (const CUDTException& e) @@ -3795,7 +3795,7 @@ int64_t CUDT::recvfile( { try { - return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block); + return uglobal()->locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block); } catch (const CUDTException& e) { @@ -3823,7 +3823,7 @@ int CUDT::select( try { - return s_UDTUnited.select(readfds, writefds, exceptfds, timeout); + return uglobal()->select(readfds, writefds, exceptfds, timeout); } catch (const CUDTException& e) { @@ -3855,7 +3855,7 @@ int CUDT::selectEx( try { - return s_UDTUnited.selectEx(fds, readfds, writefds, exceptfds, msTimeOut); + return uglobal()->selectEx(fds, readfds, writefds, exceptfds, msTimeOut); } catch (const CUDTException& e) { @@ -3877,7 +3877,7 @@ int CUDT::epoll_create() { try { - return s_UDTUnited.epoll_create(); + return uglobal()->epoll_create(); } catch (const CUDTException& e) { @@ -3895,7 +3895,7 @@ int CUDT::epoll_clear_usocks(int eid) { try { - return s_UDTUnited.epoll_clear_usocks(eid); + return uglobal()->epoll_clear_usocks(eid); } catch (const CUDTException& e) { @@ -3913,7 +3913,7 @@ int CUDT::epoll_add_usock(const int eid, const SRTSOCKET u, const int* events) { try { - return s_UDTUnited.epoll_add_usock(eid, u, events); + return uglobal()->epoll_add_usock(eid, u, events); } catch (const CUDTException& e) { @@ -3931,7 +3931,7 @@ int CUDT::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events) { try { - return s_UDTUnited.epoll_add_ssock(eid, s, events); + return uglobal()->epoll_add_ssock(eid, s, events); } catch (const CUDTException& e) { @@ -3950,7 +3950,7 @@ int CUDT::epoll_update_usock( { try { - return s_UDTUnited.epoll_add_usock(eid, u, events); + return uglobal()->epoll_add_usock(eid, u, events); } catch (const CUDTException& e) { @@ -3969,7 +3969,7 @@ int CUDT::epoll_update_ssock( { try { - return s_UDTUnited.epoll_update_ssock(eid, s, events); + return uglobal()->epoll_update_ssock(eid, s, events); } catch (const CUDTException& e) { @@ -3988,7 +3988,7 @@ int CUDT::epoll_remove_usock(const int eid, const SRTSOCKET u) { try { - return s_UDTUnited.epoll_remove_usock(eid, u); + return uglobal()->epoll_remove_usock(eid, u); } catch (const CUDTException& e) { @@ -4006,7 +4006,7 @@ int CUDT::epoll_remove_ssock(const int eid, const SYSSOCKET s) { try { - return s_UDTUnited.epoll_remove_ssock(eid, s); + return uglobal()->epoll_remove_ssock(eid, s); } catch (const CUDTException& e) { @@ -4030,7 +4030,7 @@ int CUDT::epoll_wait( { try { - return s_UDTUnited.epoll_ref().wait( + return uglobal()->epoll_ref().wait( eid, readfds, writefds, msTimeOut, lrfds, lwfds); } catch (const CUDTException& e) @@ -4053,7 +4053,7 @@ int CUDT::epoll_uwait( { try { - return s_UDTUnited.epoll_uwait(eid, fdsSet, fdsSize, msTimeOut); + return uglobal()->epoll_uwait(eid, fdsSet, fdsSize, msTimeOut); } catch (const CUDTException& e) { @@ -4073,7 +4073,7 @@ int32_t CUDT::epoll_set( { try { - return s_UDTUnited.epoll_set(eid, flags); + return uglobal()->epoll_set(eid, flags); } catch (const CUDTException& e) { @@ -4091,7 +4091,7 @@ int CUDT::epoll_release(const int eid) { try { - return s_UDTUnited.epoll_release(eid); + return uglobal()->epoll_release(eid); } catch (const CUDTException& e) { @@ -4119,7 +4119,7 @@ int CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instantaneous try { - CUDT* udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT; + CUDT* udt = uglobal()->locateSocket(u, uglobal()->ERH_THROW)->m_pUDT; udt->bstats(perf, clear, instantaneous); return 0; } @@ -4140,7 +4140,7 @@ int CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear) { try { - CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW); + CUDTUnited::GroupKeeper k(s_UDTUnited, u, uglobal()->ERH_THROW); k.group->bstatsSocket(perf, clear); return 0; } @@ -4163,7 +4163,7 @@ CUDT* CUDT::getUDTHandle(SRTSOCKET u) { try { - return s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT; + return uglobal()->locateSocket(u, uglobal()->ERH_THROW)->m_pUDT; } catch (const CUDTException& e) { @@ -4182,8 +4182,8 @@ CUDT* CUDT::getUDTHandle(SRTSOCKET u) vector CUDT::existingSockets() { vector out; - for (CUDTUnited::sockets_t::iterator i = s_UDTUnited.m_Sockets.begin(); - i != s_UDTUnited.m_Sockets.end(); ++i) + for (CUDTUnited::sockets_t::iterator i = uglobal()->m_Sockets.begin(); + i != uglobal()->m_Sockets.end(); ++i) { out.push_back(i->first); } @@ -4197,11 +4197,11 @@ SRT_SOCKSTATUS CUDT::getsockstate(SRTSOCKET u) #if ENABLE_EXPERIMENTAL_BONDING if (isgroup(u)) { - CUDTUnited::GroupKeeper k(s_UDTUnited, u, s_UDTUnited.ERH_THROW); + CUDTUnited::GroupKeeper k(s_UDTUnited, u, uglobal()->ERH_THROW); return k.group->getStatus(); } #endif - return s_UDTUnited.getStatus(u); + return uglobal()->getStatus(u); } catch (const CUDTException& e) { diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 795f0a727c..e7f55cf3b8 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -83,8 +83,6 @@ using namespace srt; using namespace srt::sync; using namespace srt_logging; -CUDTUnited CUDT::s_UDTUnited; - const SRTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK; const int UDT::ERROR = CUDT::ERROR; @@ -214,6 +212,11 @@ struct SrtOptionAction srt_options_action; +CUDTUnited* CUDT::uglobal() { + static CUDTUnited instance; + return &instance; +} + void CUDT::construct() { m_pSndBuffer = NULL; @@ -499,7 +502,7 @@ void CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen) break; case SRTO_STATE: - *(int32_t *)optval = s_UDTUnited.getStatus(m_SocketID); + *(int32_t *)optval = uglobal()->getStatus(m_SocketID); optlen = sizeof(int32_t); break; @@ -1257,7 +1260,7 @@ size_t CUDT::fillHsExtConfigString(uint32_t* pcmdspec, int cmd, const string& st #if ENABLE_EXPERIMENTAL_BONDING // [[using locked(m_parent->m_ControlLock)]] -// [[using locked(s_UDTUnited.m_GlobControlLock)]] +// [[using locked(uglobal()->m_GlobControlLock)]] size_t CUDT::fillHsExtGroup(uint32_t* pcmdspec) { SRT_ASSERT(m_parent->m_GroupOf != NULL); @@ -1679,7 +1682,7 @@ bool CUDT::createSrtHandshake( if (have_group) { // NOTE: See information about mutex ordering in api.h - ScopedLock gdrg (s_UDTUnited.m_GlobControlLock); + ScopedLock gdrg (uglobal()->m_GlobControlLock); if (!m_parent->m_GroupOf) { // This may only happen if since last check of m_GroupOf pointer the socket was removed @@ -2941,7 +2944,7 @@ bool CUDT::interpretGroup(const int32_t groupdata[], size_t data_size SRT_ATR_UN return false; } - ScopedLock guard_group_existence (s_UDTUnited.m_GlobControlLock); + ScopedLock guard_group_existence (uglobal()->m_GlobControlLock); if (m_SrtHsSide == HSD_INITIATOR) { @@ -3050,7 +3053,7 @@ bool CUDT::interpretGroup(const int32_t groupdata[], size_t data_size SRT_ATR_UN // NOTE: This function is called only in one place and it's done // exclusively on the listener side (HSD_RESPONDER, HSv5+). -// [[using locked(s_UDTUnited.m_GlobControlLock)]] +// [[using locked(uglobal()->m_GlobControlLock)]] SRTSOCKET CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint32_t link_flags) { // Note: This function will lock pg->m_GroupLock! @@ -3063,7 +3066,7 @@ SRTSOCKET CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint32_t l // it right now so there's no need to lock s->m_ControlLock. // Check if there exists a group that this one is a peer of. - CUDTGroup* gp = s_UDTUnited.findPeerGroup_LOCKED(peergroup); + CUDTGroup* gp = uglobal()->findPeerGroup_LOCKED(peergroup); bool was_empty = true; if (gp) { @@ -3094,7 +3097,7 @@ SRTSOCKET CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint32_t l if (!gp->applyFlags(link_flags, m_SrtHsSide)) { // Wrong settings. Must reject. Delete group. - s_UDTUnited.deleteGroup_LOCKED(gp); + uglobal()->deleteGroup_LOCKED(gp); return -1; } @@ -4443,7 +4446,7 @@ EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTE { #if ENABLE_EXPERIMENTAL_BONDING - ScopedLock cl (s_UDTUnited.m_GlobControlLock); + ScopedLock cl (uglobal()->m_GlobControlLock); CUDTGroup* g = m_parent->m_GroupOf; if (g) { @@ -4488,7 +4491,7 @@ EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTE // the socket could have been started removal before this function // has started. Do a sanity check before you continue with the // connection process. - CUDTSocket* s = s_UDTUnited.locateSocket(m_SocketID); + CUDTSocket* s = uglobal()->locateSocket(m_SocketID); if (s) { // The socket could be closed at this very moment. @@ -4545,7 +4548,7 @@ EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTE //int token = -1; #if ENABLE_EXPERIMENTAL_BONDING { - ScopedLock cl (s_UDTUnited.m_GlobControlLock); + ScopedLock cl (uglobal()->m_GlobControlLock); CUDTGroup* g = m_parent->m_GroupOf; if (g) { @@ -4574,7 +4577,7 @@ EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTE s->m_Status = SRTS_CONNECTED; // acknowledde any waiting epolls to write - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_CONNECT, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_CONNECT, true); CGlobEvent::triggerEvent(); @@ -5019,7 +5022,7 @@ void *CUDT::tsbpd(void *param) // which will ensure that the group will not be physically // deleted until this thread exits. // NOTE: DO NOT LEAD TO EVER CANCEL THE THREAD!!! - CUDTUnited::GroupKeeper gkeeper (self->s_UDTUnited, self->m_parent); + CUDTUnited::GroupKeeper gkeeper (self->s_UDTUnited(), self->m_parent); #endif UniqueLock recv_lock (self->m_RecvLock); @@ -5128,7 +5131,7 @@ void *CUDT::tsbpd(void *param) /* * Set EPOLL_IN to wakeup any thread waiting on epoll */ - self->s_UDTUnited.m_EPoll.update_events(self->m_SocketID, self->m_sPollID, SRT_EPOLL_IN, true); + self->uglobal()->m_EPoll.update_events(self->m_SocketID, self->m_sPollID, SRT_EPOLL_IN, true); #if ENABLE_EXPERIMENTAL_BONDING // If this is NULL, it means: // - the socket never was a group member @@ -5400,7 +5403,7 @@ void CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer, { #if ENABLE_EXPERIMENTAL_BONDING - ScopedLock cl (s_UDTUnited.m_GlobControlLock); + ScopedLock cl (uglobal()->m_GlobControlLock); CUDTGroup* g = m_parent->m_GroupOf; if (g) { @@ -5756,13 +5759,13 @@ bool CUDT::closeInternal() // Make a copy under a lock because other thread might access it // at the same time. - enterCS(s_UDTUnited.m_EPoll.m_EPollLock); + enterCS(uglobal()->m_EPoll.m_EPollLock); set epollid = m_sPollID; - leaveCS(s_UDTUnited.m_EPoll.m_EPollLock); + leaveCS(uglobal()->m_EPoll.m_EPollLock); // trigger any pending IO events. HLOGC(smlog.Debug, log << "close: SETTING ERR readiness on E" << Printable(epollid) << " of @" << m_SocketID); - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_ERR, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_ERR, true); // then remove itself from all epoll monitoring int no_events = 0; for (set::iterator i = epollid.begin(); i != epollid.end(); ++i) @@ -5770,7 +5773,7 @@ bool CUDT::closeInternal() HLOGC(smlog.Debug, log << "close: CLEARING subscription on E" << (*i) << " of @" << m_SocketID); try { - s_UDTUnited.m_EPoll.update_usock(*i, m_SocketID, &no_events); + uglobal()->m_EPoll.update_usock(*i, m_SocketID, &no_events); } catch (...) { @@ -5787,9 +5790,9 @@ bool CUDT::closeInternal() // IMPORTANT: there's theoretically little time between setting ERR readiness // and unsubscribing, however if there's an application waiting on this event, // it should be informed before this below instruction locks the epoll mutex. - enterCS(s_UDTUnited.m_EPoll.m_EPollLock); + enterCS(uglobal()->m_EPoll.m_EPollLock); m_sPollID.clear(); - leaveCS(s_UDTUnited.m_EPoll.m_EPollLock); + leaveCS(uglobal()->m_EPoll.m_EPollLock); // XXX What's this, could any of the above actions make it !m_bOpened? if (!m_bOpened) @@ -5975,7 +5978,7 @@ int CUDT::receiveBuffer(char *data, int len) if (!m_pRcvBuffer->isRcvDataReady()) { // read is not available any more - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); } if ((res <= 0) && (m_config.iRcvTimeOut >= 0)) @@ -6362,7 +6365,7 @@ int CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) if (sndBuffersLeft() < 1) // XXX Not sure if it should test if any space in the buffer, or as requried. { // write is not available any more - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, false); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, false); } } @@ -6485,7 +6488,7 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep if (!m_pRcvBuffer->isRcvDataReady()) { // read is not available any more - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); } if (res == 0) @@ -6526,7 +6529,7 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep } // Shut up EPoll if no more messages in non-blocking mode - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); // Forced to return 0 instead of throwing exception, in case of AGAIN/READ if (!by_exception) return 0; @@ -6547,7 +6550,7 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep } // Shut up EPoll if no more messages in non-blocking mode - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); // After signaling the tsbpd for ready data, report the bandwidth. #if ENABLE_HEAVY_LOGGING @@ -6666,7 +6669,7 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep } // Shut up EPoll if no more messages in non-blocking mode - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); } // Unblock when required @@ -6794,7 +6797,7 @@ int64_t CUDT::sendfile(fstream &ifs, int64_t &offset, int64_t size, int block) if (sndBuffersLeft() <= 0) { // write is not available any more - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, false); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, false); } } @@ -6918,7 +6921,7 @@ int64_t CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int block) if (!m_pRcvBuffer->isRcvDataReady()) { // read is not available any more - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false); } return size - torecv; @@ -7585,7 +7588,7 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) // can be either never set, already reset, or ever set // and possibly dangling. The re-check after lock eliminates // the dangling case. - ScopedLock glock (s_UDTUnited.m_GlobControlLock); + ScopedLock glock (uglobal()->m_GlobControlLock); // Note that updateLatestRcv will lock m_GroupOf->m_GroupLock, // but this is an intended order. @@ -7637,13 +7640,13 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) // (4) receive thread: receive data and set SRT_EPOLL_IN to true // (5) user thread: set SRT_EPOLL_IN to false // 4. so , m_RecvLock must be used here to protect epoll event - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); } #if ENABLE_EXPERIMENTAL_BONDING if (m_parent->m_GroupOf) { // See above explanation for double-checking - ScopedLock glock (s_UDTUnited.m_GlobControlLock); + ScopedLock glock (uglobal()->m_GlobControlLock); if (m_parent->m_GroupOf) { @@ -7793,7 +7796,7 @@ void CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) m_pSndBuffer->ackData(offset); // acknowledde any waiting epolls to write - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, true); CGlobEvent::triggerEvent(); } @@ -7802,7 +7805,7 @@ void CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) { // m_RecvAckLock is ordered AFTER m_GlobControlLock, so this can only // be done now that m_RecvAckLock is unlocked. - ScopedLock glock (s_UDTUnited.m_GlobControlLock); + ScopedLock glock (uglobal()->m_GlobControlLock); if (m_parent->m_GroupOf) { HLOGC(inlog.Debug, log << "ACK: acking group sender buffer for #" << msgno_at_last_acked_seq); @@ -7951,7 +7954,7 @@ void CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point #if ENABLE_EXPERIMENTAL_BONDING if (m_parent->m_GroupOf) { - ScopedLock glock (s_UDTUnited.m_GlobControlLock); + ScopedLock glock (uglobal()->m_GlobControlLock); if (m_parent->m_GroupOf) { // Will apply m_GroupLock, ordered after m_GlobControlLock. @@ -8600,7 +8603,7 @@ void CUDT::updateAfterSrtHandshake(int hsv) if (m_parent->m_GroupOf) { - ScopedLock glock (s_UDTUnited.m_GlobControlLock); + ScopedLock glock (uglobal()->m_GlobControlLock); grpspec = m_parent->m_GroupOf ? " group=$" + Sprint(m_parent->m_GroupOf->id()) : string(); @@ -9041,7 +9044,7 @@ void CUDT::processClose() // Signal the sender and recver if they are waiting for data. releaseSynch(); // Unblock any call so they learn the connection_broken error - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_ERR, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_ERR, true); HLOGP(smlog.Debug, "processClose: triggering timer event to spread the bad news"); CGlobEvent::triggerEvent(); @@ -10333,7 +10336,7 @@ int CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) { int error = SRT_REJ_UNKNOWN; CUDT* acpu = NULL; - int result = s_UDTUnited.newConnection(m_SocketID, addr, packet, (hs), (error), (acpu)); + int result = uglobal()->newConnection(m_SocketID, addr, packet, (hs), (error), (acpu)); // This is listener - m_RejectReason need not be set // because listener has no functionality of giving the app @@ -10458,7 +10461,7 @@ int CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) // Note: not using SRT_EPOLL_CONNECT symbol because this is a procedure // executed for the accepted socket. - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, true); } } LOGC(cnlog.Note, log << "listen ret: " << hs.m_iReqType << " - " << RequestTypeStr(hs.m_iReqType)); @@ -10757,7 +10760,7 @@ void CUDT::checkTimers() #if ENABLE_EXPERIMENTAL_BONDING if (m_parent->m_GroupOf) { - ScopedLock glock (s_UDTUnited.m_GlobControlLock); + ScopedLock glock (uglobal()->m_GlobControlLock); if (m_parent->m_GroupOf) { // Pass socket ID because it's about changing group socket data @@ -10777,7 +10780,7 @@ void CUDT::updateBrokenConnection() m_bClosing = true; releaseSynch(); // app can call any UDT API to learn the connection_broken error - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); CGlobEvent::triggerEvent(); } @@ -10788,7 +10791,7 @@ void CUDT::completeBrokenConnectionDependencies(int errorcode) #if ENABLE_EXPERIMENTAL_BONDING bool pending_broken = false; { - ScopedLock guard_group_existence (s_UDTUnited.m_GlobControlLock); + ScopedLock guard_group_existence (uglobal()->m_GlobControlLock); if (m_parent->m_GroupOf) { token = m_parent->m_GroupMemberData->token; @@ -10820,7 +10823,7 @@ void CUDT::completeBrokenConnectionDependencies(int errorcode) // existence of the group will not be changed during // the operation. The attempt of group deletion will // have to wait until this operation completes. - ScopedLock lock(s_UDTUnited.m_GlobControlLock); + ScopedLock lock(uglobal()->m_GlobControlLock); CUDTGroup* pg = m_parent->m_GroupOf; if (pg) { @@ -10834,7 +10837,7 @@ void CUDT::completeBrokenConnectionDependencies(int errorcode) if (pending_broken) { // XXX This somehow can cause a deadlock - // s_UDTUnited.close(m_parent); + // uglobal()->close(m_parent); m_parent->setBrokenClosed(); } #endif @@ -10842,9 +10845,9 @@ void CUDT::completeBrokenConnectionDependencies(int errorcode) void CUDT::addEPoll(const int eid) { - enterCS(s_UDTUnited.m_EPoll.m_EPollLock); + enterCS(uglobal()->m_EPoll.m_EPollLock); m_sPollID.insert(eid); - leaveCS(s_UDTUnited.m_EPoll.m_EPollLock); + leaveCS(uglobal()->m_EPoll.m_EPollLock); if (!stillConnected()) return; @@ -10852,13 +10855,13 @@ void CUDT::addEPoll(const int eid) enterCS(m_RecvLock); if (m_pRcvBuffer->isRcvDataReady()) { - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); } leaveCS(m_RecvLock); if (m_config.iSndBufSize > m_pSndBuffer->getCurrBufSize()) { - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, true); + uglobal()->m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_OUT, true); } } @@ -10868,14 +10871,14 @@ void CUDT::removeEPollEvents(const int eid) // since this happens after the epoll ID has been removed, they cannot be set again set remove; remove.insert(eid); - s_UDTUnited.m_EPoll.update_events(m_SocketID, remove, SRT_EPOLL_IN | SRT_EPOLL_OUT, false); + uglobal()->m_EPoll.update_events(m_SocketID, remove, SRT_EPOLL_IN | SRT_EPOLL_OUT, false); } void CUDT::removeEPollID(const int eid) { - enterCS(s_UDTUnited.m_EPoll.m_EPollLock); + enterCS(uglobal()->m_EPoll.m_EPollLock); m_sPollID.erase(eid); - leaveCS(s_UDTUnited.m_EPoll.m_EPollLock); + leaveCS(uglobal()->m_EPoll.m_EPollLock); } void CUDT::ConnectSignal(ETransmissionEvent evt, EventSlot sl) @@ -10904,7 +10907,7 @@ void CUDT::EmitSignal(ETransmissionEvent tev, EventVariant var) int CUDT::getsndbuffer(SRTSOCKET u, size_t *blocks, size_t *bytes) { - CUDTSocket *s = s_UDTUnited.locateSocket(u); + CUDTSocket *s = uglobal()->locateSocket(u); if (!s || !s->m_pUDT) return -1; @@ -10927,7 +10930,7 @@ int CUDT::getsndbuffer(SRTSOCKET u, size_t *blocks, size_t *bytes) int CUDT::rejectReason(SRTSOCKET u) { - CUDTSocket* s = s_UDTUnited.locateSocket(u); + CUDTSocket* s = uglobal()->locateSocket(u); if (!s || !s->m_pUDT) return SRT_REJ_UNKNOWN; @@ -10936,7 +10939,7 @@ int CUDT::rejectReason(SRTSOCKET u) int CUDT::rejectReason(SRTSOCKET u, int value) { - CUDTSocket* s = s_UDTUnited.locateSocket(u); + CUDTSocket* s = uglobal()->locateSocket(u); if (!s || !s->m_pUDT) return APIError(MJ_NOTSUP, MN_SIDINVAL); @@ -10949,7 +10952,7 @@ int CUDT::rejectReason(SRTSOCKET u, int value) int64_t CUDT::socketStartTime(SRTSOCKET u) { - CUDTSocket* s = s_UDTUnited.locateSocket(u); + CUDTSocket* s = uglobal()->locateSocket(u); if (!s || !s->m_pUDT) return APIError(MJ_NOTSUP, MN_SIDINVAL); @@ -11070,7 +11073,7 @@ void CUDT::handleKeepalive(const char* /*data*/, size_t /*size*/) // existence of the group will not be changed during // the operation. The attempt of group deletion will // have to wait until this operation completes. - ScopedLock lock(s_UDTUnited.m_GlobControlLock); + ScopedLock lock(uglobal()->m_GlobControlLock); CUDTGroup* pg = m_parent->m_GroupOf; if (pg) { diff --git a/srtcore/core.h b/srtcore/core.h index 982408c8bc..8e0849c8f6 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -413,8 +413,8 @@ class CUDT // a different channel. void skipIncoming(int32_t seq); - // For SRT_tsbpdLoop - static CUDTUnited* uglobal() { return &s_UDTUnited; } // needed by tsbpdLoop + static CUDTUnited* uglobal(); // UDT global management base + std::set& pollset() { return m_sPollID; } CSrtConfig m_config; @@ -691,7 +691,6 @@ class CUDT static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt); static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt); - static CUDTUnited s_UDTUnited; // UDT global management base private: // Identification CUDTSocket* const m_parent; // Temporary, until the CUDTSocket class is merged with CUDT diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 61400df22e..8f51f0c741 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -1108,7 +1108,7 @@ void CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, con // be normally closed by the application, after it is done with them. // app can call any UDT API to learn the connection_broken error - CUDT::s_UDTUnited.m_EPoll.update_events(i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); + CUDT::uglobal()->m_EPoll.update_events(i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); i->u->completeBrokenConnectionDependencies(i->errorcode); }