Skip to content

Commit

Permalink
[core] Refactored member pointer: now raw pointer to socket data (#1696)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethouris authored Dec 18, 2020
1 parent ba883c3 commit c12e619
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 178 deletions.
102 changes: 50 additions & 52 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ extern LogConfig srt_logger_config;
void CUDTSocket::construct()
{
#if ENABLE_EXPERIMENTAL_BONDING
m_IncludedGroup = NULL;
m_IncludedIter = CUDTGroup::gli_NULL();
m_GroupOf = NULL;
m_GroupMemberData = NULL;
#endif
setupMutex(m_AcceptLock, "Accept");
setupCond(m_AcceptCond, "Accept");
Expand Down Expand Up @@ -677,30 +677,28 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
error = 2;
}

// The access to m_IncludedGroup should be also protected, as the group
// The access to m_GroupOf should be also protected, as the group
// could be requested deletion in the meantime. This will hold any possible
// removal from group and resetting m_IncludedGroup field.
// removal from group and resetting m_GroupOf field.

#if ENABLE_EXPERIMENTAL_BONDING
if (ns->m_IncludedGroup)
if (ns->m_GroupOf)
{
// XXX this might require another check of group type.
// For redundancy group, at least, update the status in the group
CUDTGroup* g = ns->m_IncludedGroup;
CUDTGroup* g = ns->m_GroupOf;
ScopedLock glock (g->m_GroupLock);
if (g->m_bClosing)
{
error = 1; // "INTERNAL REJECTION"
goto ERR_ROLLBACK;
}

CUDTGroup::gli_t gi;

// Check if this is the first socket in the group.
// If so, give it up to accept, otherwise just do nothing
// The client will be informed about the newly added connection at the
// first moment when attempting to get the group status.
for (gi = g->m_Group.begin(); gi != g->m_Group.end(); ++gi)
for (CUDTGroup::gli_t gi = g->m_Group.begin(); gi != g->m_Group.end(); ++gi)
{
if (gi->laststatus == SRTS_CONNECTED)
{
Expand All @@ -713,13 +711,13 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,

// Update the status in the group so that the next
// operation can include the socket in the group operation.
gi = ns->m_IncludedIter;
CUDTGroup::SocketData* gm = ns->m_GroupMemberData;

HLOGC(cnlog.Debug, log << "newConnection(GROUP): Socket @" << ns->m_SocketID << " BELONGS TO $" << g->id()
<< " - will " << (should_submit_to_accept? "" : "NOT ") << "report in accept");
gi->sndstate = SRT_GST_IDLE;
gi->rcvstate = SRT_GST_IDLE;
gi->laststatus = SRTS_CONNECTED;
gm->sndstate = SRT_GST_IDLE;
gm->rcvstate = SRT_GST_IDLE;
gm->laststatus = SRTS_CONNECTED;

if (!g->m_bConnected)
{
Expand Down Expand Up @@ -845,9 +843,9 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
ScopedLock cg(m_GlobControlLock);

#if ENABLE_EXPERIMENTAL_BONDING
if (ns->m_IncludedGroup)
if (ns->m_GroupOf)
{
HLOGC(smlog.Debug, log << "@" << ns->m_SocketID << " IS MEMBER OF $" << ns->m_IncludedGroup->id() << " - REMOVING FROM GROUP");
HLOGC(smlog.Debug, log << "@" << ns->m_SocketID << " IS MEMBER OF $" << ns->m_GroupOf->id() << " - REMOVING FROM GROUP");
ns->removeFromGroup(true);
}
#endif
Expand Down Expand Up @@ -1178,16 +1176,16 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_
// and the already accepted socket has successfully joined
// the mirror group. If so, RETURN THE GROUP ID, not the socket ID.
#if ENABLE_EXPERIMENTAL_BONDING
if (ls->m_pUDT->m_OPT_GroupConnect == 1 && s->m_IncludedGroup)
if (ls->m_pUDT->m_OPT_GroupConnect == 1 && s->m_GroupOf)
{
// Put a lock to protect the group against accidental deletion
// in the meantime.
ScopedLock glock (m_GlobControlLock);
// Check again; it's unlikely to happen, but
// it's a theoretically possible scenario
if (s->m_IncludedGroup)
if (s->m_GroupOf)
{
u = s->m_IncludedGroup->m_GroupID;
u = s->m_GroupOf->m_GroupID;
s->core().m_OPT_GroupConnect = 1; // should be derived from ls, but make sure
}
else
Expand Down Expand Up @@ -1495,9 +1493,9 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar

if (proceed)
{
CUDTGroup::gli_t f = g.add(data);
ns->m_IncludedIter = f;
ns->m_IncludedGroup = &g;
CUDTGroup::SocketData* f = g.add(data);
ns->m_GroupMemberData = f;
ns->m_GroupOf = &g;
f->weight = targets[tii].weight;
LOGC(aclog.Note, log << "srt_connect_group: socket @" << sid << " added to group $" << g.m_GroupID);
}
Expand Down Expand Up @@ -1608,7 +1606,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
// set busy, so it won't be deleted, even if it was requested to be closed.
ScopedLock grd (g.m_GroupLock);

if (!ns->m_IncludedGroup)
if (!ns->m_GroupOf)
{
// The situation could get changed between the unlock and lock of m_GroupLock.
// This must be checked again.
Expand All @@ -1621,8 +1619,8 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
continue;
}

// If m_IncludedGroup is not NULL, the m_IncludedIter is still valid.
CUDTGroup::gli_t f = ns->m_IncludedIter;
// If m_GroupOf is not NULL, the m_IncludedIter is still valid.
CUDTGroup::SocketData* f = ns->m_GroupMemberData;

// Now under a group lock, we need to make sure the group isn't being closed
// in order not to add a socket to a dead group.
Expand Down Expand Up @@ -1957,11 +1955,11 @@ void CUDTUnited::deleteGroup(CUDTGroup* g)
i != m_Sockets.end(); ++ i)
{
CUDTSocket* s = i->second;
if (s->m_IncludedGroup == g)
if (s->m_GroupOf == g)
{
HLOGC(smlog.Debug, log << "deleteGroup: IPE: existing @" << s->m_SocketID << " points to a dead group!");
s->m_IncludedGroup = NULL;
s->m_IncludedIter = CUDTGroup::gli_NULL();
s->m_GroupOf = NULL;
s->m_GroupMemberData = NULL;
}
}

Expand All @@ -1971,11 +1969,11 @@ void CUDTUnited::deleteGroup(CUDTGroup* g)
i != m_ClosedSockets.end(); ++ i)
{
CUDTSocket* s = i->second;
if (s->m_IncludedGroup == g)
if (s->m_GroupOf == g)
{
HLOGC(smlog.Debug, log << "deleteGroup: IPE: closed @" << s->m_SocketID << " points to a dead group!");
s->m_IncludedGroup = NULL;
s->m_IncludedIter = CUDTGroup::gli_NULL();
s->m_GroupOf = NULL;
s->m_GroupMemberData = NULL;
}
}
}
Expand Down Expand Up @@ -2048,9 +2046,9 @@ int CUDTUnited::close(CUDTSocket* s)
s->setClosed();

#if ENABLE_EXPERIMENTAL_BONDING
if (s->m_IncludedGroup)
if (s->m_GroupOf)
{
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_IncludedGroup->id() << " - REMOVING FROM GROUP");
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
s->removeFromGroup(true);
}
#endif
Expand Down Expand Up @@ -2563,7 +2561,7 @@ CUDTGroup* CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling erh)
CUDTGroup* CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
{
ScopedLock cg (m_GlobControlLock);
CUDTGroup* g = s->m_IncludedGroup;
CUDTGroup* g = s->m_GroupOf;
if (!g)
return NULL;

Expand Down Expand Up @@ -2672,9 +2670,9 @@ void CUDTUnited::checkBrokenSockets()
}

#if ENABLE_EXPERIMENTAL_BONDING
if (s->m_IncludedGroup)
if (s->m_GroupOf)
{
LOGC(smlog.Note, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_IncludedGroup->id() << " - REMOVING FROM GROUP");
LOGC(smlog.Note, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
s->removeFromGroup(true);
}
#endif
Expand Down Expand Up @@ -2757,9 +2755,9 @@ void CUDTUnited::removeSocket(const SRTSOCKET u)
CUDTSocket* const s = i->second;

#if ENABLE_EXPERIMENTAL_BONDING
if (s->m_IncludedGroup)
if (s->m_GroupOf)
{
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_IncludedGroup->id() << " - REMOVING FROM GROUP");
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
s->removeFromGroup(true);
}
#endif
Expand Down Expand Up @@ -3101,9 +3099,9 @@ void* CUDTUnited::garbageCollect(void* p)
s->breakSocket_LOCKED();

#if ENABLE_EXPERIMENTAL_BONDING
if (s->m_IncludedGroup)
if (s->m_GroupOf)
{
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_IncludedGroup->id() << " (IPE?) - REMOVING FROM GROUP");
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " (IPE?) - REMOVING FROM GROUP");
s->removeFromGroup(false);
}
#endif
Expand Down Expand Up @@ -3262,7 +3260,7 @@ int CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group)
return APIError(MJ_NOTSUP, MN_INVAL, 0);

// Check if the socket is already IN SOME GROUP.
if (s->m_IncludedGroup)
if (s->m_GroupOf)
return APIError(MJ_NOTSUP, MN_INVAL, 0);

CUDTGroup* g = k.group;
Expand All @@ -3282,17 +3280,17 @@ int CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group)
return APIError(MJ_NOTSUP, MN_INVAL, 0);

// Check if the socket already is in the group
CUDTGroup::gli_t f = g->find(socket);
if (f != CUDTGroup::gli_NULL())
CUDTGroup::SocketData* f;
if (g->contains(socket, (f)))
{
// XXX This is internal error. Report it, but continue
LOGC(aclog.Error, log << "IPE (non-fatal): the socket is in the group, but has no clue about it!");
s->m_IncludedIter = f;
s->m_IncludedGroup = g;
s->m_GroupMemberData = f;
s->m_GroupOf = g;
return 0;
}
s->m_IncludedIter = g->add(g->prepareData(s));
s->m_IncludedGroup = g;
s->m_GroupMemberData = g->add(g->prepareData(s));
s->m_GroupOf = g;

return 0;
}
Expand All @@ -3305,7 +3303,7 @@ int CUDT::removeSocketFromGroup(SRTSOCKET socket)
if (!s)
return APIError(MJ_NOTSUP, MN_INVAL, 0);

if (!s->m_IncludedGroup)
if (!s->m_GroupOf)
return APIError(MJ_NOTSUP, MN_INVAL, 0);

ScopedLock cg (s->m_ControlLock);
Expand All @@ -3318,16 +3316,16 @@ int CUDT::removeSocketFromGroup(SRTSOCKET socket)
// [[using locked(CUDT::s_UDTUnited.m_GlobControlLock)]]
void CUDTSocket::removeFromGroup(bool broken)
{
CUDTGroup* g = m_IncludedGroup;
CUDTGroup* g = m_GroupOf;
if (g)
{
// Reset group-related fields immediately. They won't be accessed
// in the below calls, while the iterator will be invalidated for
// a short moment between removal from the group container and the end,
// while the GroupLock would be already taken out. It is safer to reset
// it to a NULL iterator before removal.
m_IncludedGroup = NULL;
m_IncludedIter = CUDTGroup::gli_NULL();
m_GroupOf = NULL;
m_GroupMemberData = NULL;

bool still_have = g->remove(m_SocketID);
if (broken)
Expand All @@ -3351,10 +3349,10 @@ SRTSOCKET CUDT::getGroupOfSocket(SRTSOCKET socket)
// to persist the call.
ScopedLock glock (s_UDTUnited.m_GlobControlLock);
CUDTSocket* s = s_UDTUnited.locateSocket_LOCKED(socket);
if (!s || !s->m_IncludedGroup)
if (!s || !s->m_GroupOf)
return APIError(MJ_NOTSUP, MN_INVAL, 0);

return s->m_IncludedGroup->id();
return s->m_GroupOf->id();
}

int CUDT::configureGroup(SRTSOCKET groupid, const char* str)
Expand Down
11 changes: 6 additions & 5 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class CUDTSocket
, m_ListenSocket(0)
, m_PeerID(0)
#if ENABLE_EXPERIMENTAL_BONDING
, m_IncludedGroup()
, m_GroupMemberData()
, m_GroupOf()
#endif
, m_iISN(0)
, m_pUDT(NULL)
Expand All @@ -110,16 +111,16 @@ class CUDTSocket
/// 1 second (see CUDTUnited::checkBrokenSockets()).
srt::sync::steady_clock::time_point m_tsClosureTimeStamp;

sockaddr_any m_SelfAddr; //< local address of the socket
sockaddr_any m_PeerAddr; //< peer address of the socket
sockaddr_any m_SelfAddr; //< local address of the socket
sockaddr_any m_PeerAddr; //< peer address of the socket

SRTSOCKET m_SocketID; //< socket ID
SRTSOCKET m_ListenSocket; //< ID of the listener socket; 0 means this is an independent socket

SRTSOCKET m_PeerID; //< peer socket ID
#if ENABLE_EXPERIMENTAL_BONDING
CUDTGroup::gli_t m_IncludedIter; //< Container's iterator of the group to which it belongs, or gli_NULL() if it isn't
CUDTGroup* m_IncludedGroup; //< Group this socket is a member of, or NULL if it isn't
CUDTGroup::SocketData* m_GroupMemberData; //< Pointer to group member data, or NULL if not a group member
CUDTGroup* m_GroupOf; //< Group this socket is a member of, or NULL if it isn't
#endif

int32_t m_iISN; //< initial sequence number, used to tell different connection from same IP:port
Expand Down
Loading

0 comments on commit c12e619

Please sign in to comment.