Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] CheckValidSockets no longer remove sockets from group, this was causing inconsistency. #1687

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1062,8 +1062,16 @@ void CUDTGroup::send_CheckValidSockets()
CUDTSocket* revps = m_pGlobal->locateSocket_LOCKED(d->id);
if (revps != d->ps)
{
HLOGC(gmlog.Debug, log << "group/send_CheckValidSockets: socket @" << d->id << " is no longer valid, REMOVING FROM $" << id());
remove_LOCKED(d->id);
// Note: the socket might STILL EXIST, just in the trash, so
// it can't be found by locateSocket. But it can still be bound
// to the group. Just mark it broken from upside so that the
// internal sending procedures will skip it. Removal from the
// group will happen in GC, which will both remove from
// group container and cut backward links to the group.

HLOGC(gmlog.Debug, log << "group/send_CheckValidSockets: socket @" << d->id << " is no longer valid, setting BROKEN in $" << id());
d->sndstate = SRT_GST_BROKEN;
d->rcvstate = SRT_GST_BROKEN;
Comment on lines +1073 to +1074
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we safe to set this to broken? This happens under these locks:

  • m_pGlobal->m_GlobControlLock
  • m_GroupLock

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. The first one protects the sockets against being updated against the group (that is, removed from the group container in the meantime), the second one protects the m_Group container (members container). Note also that this function is being called from inside the sending function only, so it happens in the same thread as the transmission functions. A simultaneous call to a receiving function, let's say, isn't possible because it also requires lock on m_Group.

}
}
}
Expand Down Expand Up @@ -1137,7 +1145,6 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
// LOCKED: GroupLock (only)
// Since this moment GlobControlLock may only be locked if GroupLock is unlocked first.


if (m_bClosing)
{
// No temporary locks here. The group lock is scoped.
Expand All @@ -1147,18 +1154,21 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
// This simply requires the payload to be sent through every socket in the group
for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d)
{
// Check the socket state prematurely in order not to uselessly
// send over a socket that is broken.
CUDT* pu = 0;
if (d->ps)
pu = &d->ps->core();

if (!pu || pu->m_bBroken)
if (d->sndstate != SRT_GST_BROKEN)
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: socket @" << d->id << " detected +Broken - transit to BROKEN");
d->sndstate = SRT_GST_BROKEN;
d->rcvstate = SRT_GST_BROKEN;
// Check the socket state prematurely in order not to uselessly
// send over a socket that is broken.
CUDT* const pu = (d->ps)
? &d->ps->core()
: NULL;

if (!pu || pu->m_bBroken)
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: socket @" << d->id << " detected +Broken - transit to BROKEN");
d->sndstate = SRT_GST_BROKEN;
d->rcvstate = SRT_GST_BROKEN;
}
}

// Check socket sndstate before sending
Expand Down Expand Up @@ -3776,17 +3786,20 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
// First, check status of every link - no matter if idle or active.
for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d)
{
// Check the socket state prematurely in order not to uselessly
// send over a socket that is broken.
CUDT* pu = 0;
if (d->ps)
pu = &d->ps->core();

if (!pu || pu->m_bBroken)
if (d->sndstate != SRT_GST_BROKEN)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: socket @" << d->id << " detected +Broken - transit to BROKEN");
d->sndstate = SRT_GST_BROKEN;
d->rcvstate = SRT_GST_BROKEN;
// Check the socket state prematurely in order not to uselessly
// send over a socket that is broken.
CUDT* const pu = (d->ps)
? &d->ps->core()
: NULL;

if (!pu || pu->m_bBroken)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: socket @" << d->id << " detected +Broken - transit to BROKEN");
d->sndstate = SRT_GST_BROKEN;
d->rcvstate = SRT_GST_BROKEN;
}
}

// Check socket sndstate before sending
Expand Down
9 changes: 2 additions & 7 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,9 @@ class CUDTGroup
/// @return true if the container still contains any sockets after the operation
bool remove(SRTSOCKET id)
{
using srt_logging::gmlog;
srt::sync::ScopedLock g(m_GroupLock);
return remove_LOCKED(id);
}

// No-locking version of the function above.
bool remove_LOCKED(SRTSOCKET id)
{
using srt_logging::gmlog;
bool empty = false;
HLOGC(gmlog.Debug, log << "group/remove: going to remove @" << id << " from $" << m_GroupID);

Expand Down Expand Up @@ -200,7 +195,7 @@ class CUDTGroup
}
else
{
HLOGC(gmlog.Debug, log << "group/remove: IPE: id @" << id << " NOT FOUND (might be ok, if removed already by CheckValidSockets)");
HLOGC(gmlog.Debug, log << "group/remove: IPE: id @" << id << " NOT FOUND");
empty = true; // not exactly true, but this is to cause error on group in the APP
}

Expand Down