Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Fixing synchronisation of the TSBPD thread #1620

Closed

Conversation

maxsharabayko
Copy link
Collaborator

@maxsharabayko maxsharabayko commented Oct 22, 2020

Issue: The join() operation on the TSBPD thread may be missed if CUDT::releaseSynch() finishes its execution while the receiving thread is still active and is about to create this thread.

Changes

  1. The CUDT::m_RecvDataCond and CUDT::m_RecvDataLock mutex are replaced by CUDT::m_ReadyToReadEvent event (of type CEvent that combines CV and the corresponding mutex).

TODO

  • add description

Closes #1606

Includes unit test from #1619

@maxsharabayko maxsharabayko added Type: Bug Indicates an unexpected problem or unintended behavior [core] Area: Changes in SRT library core labels Oct 22, 2020
@maxsharabayko maxsharabayko added this to the v1.5.0 milestone Oct 22, 2020
@maxsharabayko maxsharabayko self-assigned this Oct 22, 2020
@mbakholdina mbakholdina modified the milestones: v1.5.0, v1.4.3 Oct 22, 2020
@maxsharabayko maxsharabayko linked an issue Oct 23, 2020 that may be closed by this pull request
@maxsharabayko maxsharabayko force-pushed the hotfix/tsbpd-thread-sync branch from 8bf1a64 to 982bbb0 Compare October 26, 2020 15:27
@maxsharabayko
Copy link
Collaborator Author

Core threads synchronization overview.

Note. CRcvQueue::m_pRcvUList may still have this socket, and may be processing it during releaseSynch(). Some synchronization is likely required.

tsbpd()

A separate internal thread per receiving socket.

CUDT::tsbpd() (click to expand/collapse)

{
    UniqueLock recv_lock  (self->m_RecvLock);
    CSync tsbpd_cc    (self->m_RcvTsbPdCond, recv_lock);
    
    while (!self->m_bClosing)
    {
        enterCS(self->m_RcvBufferLock);
        self->m_pRcvBuffer->getRcvFirstMsg();
        self->m_pRcvBuffer->skipData(seqlen);
        self->m_pRcvBuffer->isRcvDataReady(..);
        leaveCS(self->m_RcvBufferLock);
        
        if (self->m_bSynRecving)
           // TODO: [SYNC] Lock before signalling?
           self->m_ReadyToReadEvent.notify_one();
        self->s_UDTUnited.m_EPoll.update_events(self->m_SocketID, self->m_sPollID, SRT_EPOLL_IN, true);
        CGlobEvent::triggerEvent();
        
        if (tsbpdtime)
            tsbpd_cc.wait_for(timediff);
        else
            tsbpd_cc.wait();
    }
}

receiveMessage()

CUDT::receiveMessage() is called from the srt_recvmsg(..) thread.

CUDT::receiveMessage() (click to expand/collapse)

{
    UniqueLock recvguard (m_RecvLock);
    CSync tscond     (m_RcvTsbPdCond,  recvguard);
    
    if (m_bBroken || m_bClosing)
    {
        enterCS(m_RcvBufferLock);
        const int res = m_pRcvBuffer->readMsg(data, len);
        leaveCS(m_RcvBufferLock);
        if (m_bTsbPd)
        {
            HLOGP(tslog.Debug, "Ping TSBPD thread to schedule wakeup");
            tscond.signal_locked(recvguard);
        }
    }
    
    if (!m_bSynRecving)
    {
        enterCS(m_RcvBufferLock);
        const int res = m_pRcvBuffer->readMsg(data, len, (w_mctrl), seqdistance);
        leaveCS(m_RcvBufferLock);
        
        if (m_bTsbPd)
        {
            HLOGP(arlog.Debug, "receiveMessage: nothing to read, kicking TSBPD, return AGAIN");
            tscond.signal_locked(recvguard);
        }
        
        if (!m_pRcvBuffer->isRcvDataReady())
        {
            // Kick TsbPd thread to schedule next wakeup (if running)
            if (m_bTsbPd)
            {
                HLOGP(arlog.Debug, "receiveMessage: DATA READ, but nothing more - kicking TSBPD.");
                tscond.signal_locked(recvguard);
            }
        }
        return res;
    }
    
    do
    {
        if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(..))
        {
            /* Kick TsbPd thread to schedule next wakeup (if running) */
            if (m_bTsbPd)
                tscond.signal_locked(recvguard);

            do
            {
                if (!m_ReadyToReadEvent.lock_wait_until(exptime))
                {
                    if (m_iRcvTimeOut >= 0) // otherwise it's "no timeout set"
                        timeout = true;
                }
            } while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));
            
            
        }
        
        enterCS(m_RcvBufferLock);
        res = m_pRcvBuffer->readMsg((data), len, (w_mctrl), seqdistance);
        leaveCS(m_RcvBufferLock);

    } while ((res == 0) && !timeout);
            
    if (!m_pRcvBuffer->isRcvDataReady())
    {
        // Kick TsbPd thread to schedule next wakeup (if running)
        if (m_bTsbPd)
            tscond.signal_locked(recvguard);

        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
    }
    
    return res;
}

processData(..)

CUDT::processData(CUnit* ) is called from the internal receiving thread.

CUDT::processData(..) (click to expand/collapse)

{
    const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;

    // We are receiving data, start tsbpd thread if TsbPd is enabled
    if (need_tsbpd && !m_RcvTsbPdThread.joinable())
    {
        StartThread(m_RcvTsbPdThread, CUDT::tsbpd, this, thname);
    }
    
    {
        UniqueLock recvbuf_acklock(m_RcvBufferLock);
        m_pRcvBuffer->addData(*i, offset);
    }

    // Wake up TSBPD on loss to reschedule possible TL drop
    if (!srt_loss_seqs.empty() && m_bTsbPd)
        CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);

    if (!filter_loss_seqs.empty() && m_bTsbPd)
        CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
}

releaseSynch()

CUDT::releaseSynch() is called on:

  • UMSG_SHUTDOWN
  • CUDT::checkExpTimer
  • in CUDT::processData (SEQUENCE DISCREPANCY. BREAKING CONNECTION)
  • in srt_close() or sending thread in non-blocking mode
  • in Garbage Collector thread (checkBrokenSockets(..) or makeClosed())
  • in srt_connect
CUDT::releaseSynch() (click to expand/collapse)

{
    // wake up user calls
    CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);

    enterCS(m_SendLock);
    leaveCS(m_SendLock);

    m_ReadyToReadEvent.lock_notify_one();
    CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);

    // TODO: [SYNC] Protect TBBPD Thread join
    //enterCS(m_NewDataReceivedLock);
    if (m_RcvTsbPdThread.joinable())
    {
        m_RcvTsbPdThread.join();
    }
    //leaveCS(m_NewDataReceivedLock);

    enterCS(m_RecvLock);
    leaveCS(m_RecvLock);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[core] Area: Changes in SRT library core Type: Bug Indicates an unexpected problem or unintended behavior
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Potential leak of TSBPD thread
2 participants