Skip to content

Commit

Permalink
[core] Refax: placing SRT classes inside 'srt' namespace.
Browse files Browse the repository at this point in the history
CUDT, CUDTUnited, CUDTSocket, CUDTGroup, etc.
  • Loading branch information
maxsharabayko committed May 19, 2021
1 parent 4ddb68e commit 445a60c
Show file tree
Hide file tree
Showing 37 changed files with 686 additions and 631 deletions.
370 changes: 172 additions & 198 deletions srtcore/api.cpp

Large diffs are not rendered by default.

38 changes: 21 additions & 17 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ modified by
// Please refer to structure and locking information provided in the
// docs/dev/low-level-info.md document.

namespace srt {

class CUDT;

class CUDTSocket
Expand Down Expand Up @@ -107,7 +109,7 @@ class CUDTSocket
/// of sockets in order to prevent other methods from accessing invalid address.
/// A timer is started and the socket will be removed after approximately
/// 1 second (see CUDTUnited::checkBrokenSockets()).
srt::sync::steady_clock::time_point m_tsClosureTimeStamp;
sync::steady_clock::time_point m_tsClosureTimeStamp;

sockaddr_any m_SelfAddr; //< local address of the socket
sockaddr_any m_PeerAddr; //< peer address of the socket
Expand All @@ -117,18 +119,18 @@ class CUDTSocket

SRTSOCKET m_PeerID; //< peer socket ID
#if ENABLE_EXPERIMENTAL_BONDING
srt::groups::SocketData* m_GroupMemberData; //< Pointer to group member data, or NULL if not a group member
groups::SocketData* m_GroupMemberData; //< Pointer to group member data, or NULL if not a group member
CUDTGroup* m_GroupOf; //< Group this socket is a member of, or NULL if it isn't
#endif

int32_t m_iISN; //< initial sequence number, used to tell different connection from same IP:port

CUDT* m_pUDT; //< pointer to the UDT entity

std::set<SRTSOCKET> m_QueuedSockets; //< set of connections waiting for accept()
std::set<SRTSOCKET> m_QueuedSockets; //< set of connections waiting for accept()

srt::sync::Condition m_AcceptCond; //< used to block "accept" call
srt::sync::Mutex m_AcceptLock; //< mutex associated to m_AcceptCond
sync::Condition m_AcceptCond; //< used to block "accept" call
sync::Mutex m_AcceptLock; //< mutex associated to m_AcceptCond

unsigned int m_uiBackLog; //< maximum number of connections in queue

Expand All @@ -143,9 +145,9 @@ class CUDTSocket
// When deleting, you simply "unsubscribe" yourself from the multiplexer, which
// will unref it and remove the list element by the iterator kept by the
// socket.
int m_iMuxID; //< multiplexer ID
int m_iMuxID; //< multiplexer ID

srt::sync::Mutex m_ControlLock; //< lock this socket exclusively for control APIs: bind/listen/connect
sync::Mutex m_ControlLock; //< lock this socket exclusively for control APIs: bind/listen/connect

CUDT& core() { return *m_pUDT; }

Expand Down Expand Up @@ -346,12 +348,12 @@ friend class CRendezvousQueue;
groups_t m_Groups;
#endif

srt::sync::Mutex m_GlobControlLock; // used to synchronize UDT API
sync::Mutex m_GlobControlLock; // used to synchronize UDT API

srt::sync::Mutex m_IDLock; // used to synchronize ID generation
sync::Mutex m_IDLock; // used to synchronize ID generation

SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID
SRTSOCKET m_SocketIDGenerator_init; // Keeps track of the very first one
SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID
SRTSOCKET m_SocketIDGenerator_init; // Keeps track of the very first one

std::map<int64_t, std::set<SRTSOCKET> > m_PeerRec;// record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn

Expand Down Expand Up @@ -394,7 +396,7 @@ friend class CRendezvousQueue;
// We have a guarantee that if `group` was set
// as non-NULL here, it is also acquired and will not
// be deleted until this busy flag is set back to false.
srt::sync::ScopedLock cgroup (*group->exp_groupLock());
sync::ScopedLock cgroup (*group->exp_groupLock());
group->apiRelease();
// Only now that the group lock is lifted, can the
// group be now deleted and this pointer potentially dangling
Expand All @@ -408,21 +410,21 @@ friend class CRendezvousQueue;

private:
std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexer
srt::sync::Mutex m_MultiplexerLock;
sync::Mutex m_MultiplexerLock;

private:
CCache<CInfoBlock>* m_pCache; // UDT network information cache

private:
volatile bool m_bClosing;
srt::sync::Mutex m_GCStopLock;
srt::sync::Condition m_GCStopCond;
sync::Mutex m_GCStopLock;
sync::Condition m_GCStopCond;

srt::sync::Mutex m_InitLock;
sync::Mutex m_InitLock;
int m_iInstanceCount; // number of startup() called by application
bool m_bGCStatus; // if the GC thread is working (true)

srt::sync::CThread m_GCThread;
sync::CThread m_GCThread;
static void* garbageCollect(void*);

sockets_t m_ClosedSockets; // temporarily store closed sockets
Expand All @@ -440,4 +442,6 @@ friend class CRendezvousQueue;
CUDTUnited& operator=(const CUDTUnited&);
};

} // namespace srt

#endif
15 changes: 8 additions & 7 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ modified by

using namespace std;
using namespace srt_logging;
using namespace srt;
using namespace srt::sync;

// You can change this value at build config by using "ENFORCE" options.
Expand Down Expand Up @@ -317,7 +318,7 @@ void CSndBuffer::updateInputRate(const steady_clock::time_point& time, int pkts,
if (early_update || period_us > m_InRatePeriod)
{
// Required Byte/sec rate (payload + headers)
m_iInRateBytesCount += (m_iInRatePktsCount * CPacket::SRT_DATA_HDR_SIZE);
m_iInRateBytesCount += (m_iInRatePktsCount * srt::CPacket::SRT_DATA_HDR_SIZE);
m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
HLOGC(bslog.Debug,
log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
Expand Down Expand Up @@ -410,7 +411,7 @@ steady_clock::time_point CSndBuffer::getSourceTime(const CSndBuffer::Block& bloc
return block.m_tsOriginTime;
}

int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, int kflgs)
int CSndBuffer::readData(srt::CPacket& w_packet, steady_clock::time_point& w_srctime, int kflgs)
{
// No data to read
if (m_pCurrBlock == m_pLastBlock)
Expand Down Expand Up @@ -511,7 +512,7 @@ int32_t CSndBuffer::getMsgNoAt(const int offset)
return p->getMsgSeq();
}

int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time_point& w_srctime, int& w_msglen)
int CSndBuffer::readData(const int offset, srt::CPacket& w_packet, steady_clock::time_point& w_srctime, int& w_msglen)
{
int32_t& msgno_bitset = w_packet.m_iMsgNo;

Expand Down Expand Up @@ -927,7 +928,7 @@ int CRcvBuffer::readBuffer(char* data, int len)
return -1;
}

const CPacket& pkt = m_pUnit[p]->m_Packet;
const srt::CPacket& pkt = m_pUnit[p]->m_Packet;

if (bTsbPdEnabled)
{
Expand Down Expand Up @@ -996,7 +997,7 @@ int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
continue;
}

const CPacket& pkt = m_pUnit[p]->m_Packet;
const srt::CPacket& pkt = m_pUnit[p]->m_Packet;

#if ENABLE_LOGGING
trace_seq = pkt.getSeqNo();
Expand Down Expand Up @@ -1436,7 +1437,7 @@ bool CRcvBuffer::isRcvDataReady(steady_clock::time_point& w_tsbpdtime, int32_t&

if (m_tsbpd.isEnabled())
{
const CPacket* pkt = getRcvReadyPacket(seqdistance);
const srt::CPacket* pkt = getRcvReadyPacket(seqdistance);
if (!pkt)
{
HLOGC(brlog.Debug, log << "isRcvDataReady: packet NOT extracted.");
Expand Down Expand Up @@ -1573,7 +1574,7 @@ void CRcvBuffer::reportBufferStats() const
uint64_t lower_time = low_ts;

if (lower_time > upper_time)
upper_time += uint64_t(CPacket::MAX_TIMESTAMP) + 1;
upper_time += uint64_t(srt::CPacket::MAX_TIMESTAMP) + 1;

int32_t timespan = upper_time - lower_time;
int seqspan = 0;
Expand Down
16 changes: 8 additions & 8 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class CSndBuffer
/// @param [out] origintime origin time stamp of the message
/// @param [in] kflags Odd|Even crypto key flag
/// @return Actual length of data read.
int readData(CPacket& w_packet, time_point& w_origintime, int kflgs);
int readData(srt::CPacket& w_packet, time_point& w_origintime, int kflgs);

/// Find data position to pack a DATA packet for a retransmission.
/// @param [out] data the pointer to the data position.
Expand All @@ -154,7 +154,7 @@ class CSndBuffer
/// @param [out] origintime origin time stamp of the message
/// @param [out] msglen length of the message
/// @return Actual length of data read.
int readData(const int offset, CPacket& w_packet, time_point& w_origintime, int& w_msglen);
int readData(const int offset, srt::CPacket& w_packet, time_point& w_origintime, int& w_msglen);

/// Get the time of the last retransmission (if any) of the DATA packet.
/// @param [in] offset offset from the last ACK point (backward sequence number difference)
Expand Down Expand Up @@ -288,15 +288,15 @@ class CRcvBuffer
/// Construct the buffer.
/// @param [in] queue CUnitQueue that actually holds the units (packets)
/// @param [in] bufsize_pkts in units (packets)
CRcvBuffer(CUnitQueue* queue, int bufsize_pkts = DEFAULT_SIZE);
CRcvBuffer(srt::CUnitQueue* queue, int bufsize_pkts = DEFAULT_SIZE);
~CRcvBuffer();

public:
/// Write data into the buffer.
/// @param [in] unit pointer to a data unit containing new packet
/// @param [in] offset offset from last ACK point.
/// @return 0 is success, -1 if data is repeated.
int addData(CUnit* unit, int offset);
int addData(srt::CUnit* unit, int offset);

/// Read data into a user buffer.
/// @param [in] data pointer to user buffer.
Expand Down Expand Up @@ -402,7 +402,7 @@ class CRcvBuffer

bool isRcvDataReady();
bool isRcvDataAvailable() { return m_iLastAckPos != m_iStartPos; }
CPacket* getRcvReadyPacket(int32_t seqdistance);
srt::CPacket* getRcvReadyPacket(int32_t seqdistance);

/// Set TimeStamp-Based Packet Delivery Rx Mode
/// @param [in] timebase localtime base (uSec) of packet time stamps including buffering delay
Expand Down Expand Up @@ -462,7 +462,7 @@ class CRcvBuffer
/// data.
size_t freeUnitAt(size_t p)
{
CUnit* u = m_pUnit[p];
srt::CUnit* u = m_pUnit[p];
m_pUnit[p] = NULL;
size_t rmbytes = u->m_Packet.getLength();
m_pUnitQueue->makeUnitFree(u);
Expand Down Expand Up @@ -528,9 +528,9 @@ class CRcvBuffer
}

private:
CUnit** m_pUnit; // Array of pointed units collected in the buffer
srt::CUnit** m_pUnit; // Array of pointed units collected in the buffer
const int m_iSize; // Size of the internal array of CUnit* items
CUnitQueue* m_pUnitQueue; // the shared unit queue
srt::CUnitQueue* m_pUnitQueue; // the shared unit queue

int m_iStartPos; // HEAD: first packet available for reading
int m_iLastAckPos; // the last ACKed position (exclusive), follows the last readable
Expand Down
Loading

0 comments on commit 445a60c

Please sign in to comment.