Skip to content

Commit

Permalink
[core] CRcvQueue tracks IP version instead of CUnitQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Jul 12, 2022
1 parent 830c599 commit c3fed9c
Showing 4 changed files with 20 additions and 36 deletions.
16 changes: 5 additions & 11 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
@@ -72,7 +72,6 @@ srt::CUnitQueue::CUnitQueue()
, m_iSize(0)
, m_iCount(0)
, m_iMSS()
, m_iIPversion()
{
}

@@ -94,7 +93,7 @@ srt::CUnitQueue::~CUnitQueue()
}
}

int srt::CUnitQueue::init(int size, int mss, int version)
int srt::CUnitQueue::init(int size, int mss)
{
CQEntry* tempq = NULL;
CUnit* tempu = NULL;
@@ -131,7 +130,6 @@ int srt::CUnitQueue::init(int size, int mss, int version)

m_iSize = size;
m_iMSS = mss;
m_iIPversion = version;

return 0;
}
@@ -186,12 +184,6 @@ int srt::CUnitQueue::increase()
return 0;
}

int srt::CUnitQueue::shrink()
{
// currently queue cannot be shrunk.
return -1;
}

srt::CUnit* srt::CUnitQueue::getNextAvailUnit()
{
if (m_iCount * 10 > m_iSize * 9)
@@ -1123,6 +1115,7 @@ srt::CRcvQueue::CRcvQueue()
, m_pHash(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
, m_iIPversion()
, m_szPayloadSize()
, m_bClosing(false)
, m_LSLock()
@@ -1170,9 +1163,10 @@ srt::sync::atomic<int> srt::CRcvQueue::m_counter(0);

void srt::CRcvQueue::init(int qsize, size_t payload, int version, int hsize, CChannel* cc, CTimer* t)
{
m_iIPversion = version;
m_szPayloadSize = payload;

m_UnitQueue.init(qsize, (int)payload, version);
m_UnitQueue.init(qsize, (int)payload);

m_pHash = new CHash;
m_pHash->init(hsize);
@@ -1199,7 +1193,7 @@ void srt::CRcvQueue::init(int qsize, size_t payload, int version, int hsize, CCh
void* srt::CRcvQueue::worker(void* param)
{
CRcvQueue* self = (CRcvQueue*)param;
sockaddr_any sa(self->m_UnitQueue.getIPversion());
sockaddr_any sa(self->getIPversion());
int32_t id = 0;

#if ENABLE_LOGGING
32 changes: 11 additions & 21 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
@@ -90,39 +90,29 @@ class CUnitQueue
~CUnitQueue();

public: // Storage size operations
/// Initialize the unit queue.
/// @param [in] size queue size
/// @param [in] mss maximum segment size
/// @param [in] version IP version
/// @return 0: success, -1: failure.
int init(int size, int mss, int version);
/// Initialize the unit queue.
/// @param [in] size queue size
/// @param [in] mss maximum segment size
/// @return 0: success, -1: failure.
int init(int size, int mss);

/// Increase (double) the unit queue size.
/// @return 0: success, -1: failure.

int increase();

/// Decrease (halve) the unit queue size.
/// @return 0: success, -1: failure.

int shrink();

public:
int size() const { return m_iSize - m_iCount; }
int capacity() const { return m_iSize; }

public: // Operations on units
/// find an available unit for incoming packet.
/// @return Pointer to the available unit, NULL if not found.
/// find an available unit for incoming packet.
/// @return Pointer to the available unit, NULL if not found.
CUnit* getNextAvailUnit();

void makeUnitFree(CUnit* unit);

void makeUnitGood(CUnit* unit);

public:
inline int getIPversion() const { return m_iIPversion; }

private:
struct CQEntry
{
@@ -141,7 +131,6 @@ class CUnitQueue
sync::atomic<int> m_iCount; // total number of valid (occupied) packets in the queue

int m_iMSS; // unit buffer size
int m_iIPversion; // IP version

private:
CUnitQueue(const CUnitQueue&);
@@ -510,20 +499,20 @@ class CRcvQueue
/// @param [in] hsize hash table size
/// @param [in] c UDP channel to be associated to the queue
/// @param [in] t timer

void init(int size, size_t payload, int version, int hsize, CChannel* c, sync::CTimer* t);

/// Read a packet for a specific UDT socket id.
/// @param [in] id Socket ID
/// @param [out] packet received packet
/// @return Data size of the packet

int recvfrom(int32_t id, CPacket& to_packet);

void stopWorker();

void setClosing() { m_bClosing = true; }

int getIPversion() { return m_iIPversion; }

private:
static void* worker(void* param);
sync::CThread m_WorkerThread;
@@ -540,7 +529,8 @@ class CRcvQueue
CChannel* m_pChannel; // UDP channel for receving packets
sync::CTimer* m_pTimer; // shared timer with the snd queue

size_t m_szPayloadSize; // packet payload size
int m_iIPversion; // IP version
size_t m_szPayloadSize; // packet payload size

sync::atomic<bool> m_bClosing; // closing the worker
#if ENABLE_LOGGING
2 changes: 1 addition & 1 deletion test/test_buffer.cpp
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ class CRcvBufferReadMsg
// make_unique is unfortunatelly C++14
m_unit_queue = unique_ptr<CUnitQueue>(new CUnitQueue);
ASSERT_NE(m_unit_queue.get(), nullptr);
m_unit_queue->init(m_buff_size_pkts, 1500, AF_INET);
m_unit_queue->init(m_buff_size_pkts, 1500);

#if ENABLE_NEW_RCVBUFFER
const bool enable_msg_api = m_use_message_api;
6 changes: 3 additions & 3 deletions test/test_unitqueue.cpp
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ TEST(CUnitQueue, Increase)
{
const int buffer_size_pkts = 4;
CUnitQueue unit_queue;
unit_queue.init(buffer_size_pkts, 1500, AF_INET);
unit_queue.init(buffer_size_pkts, 1500);

vector<CUnit*> taken_units;
for (int i = 0; i < 5 * buffer_size_pkts; ++i)
@@ -38,7 +38,7 @@ TEST(CUnitQueue, IncreaseAndFree)
{
const int buffer_size_pkts = 4;
CUnitQueue unit_queue;
unit_queue.init(buffer_size_pkts, 1500, AF_INET);
unit_queue.init(buffer_size_pkts, 1500);

CUnit* taken_unit = nullptr;
for (int i = 0; i < 5 * buffer_size_pkts; ++i)
@@ -63,7 +63,7 @@ TEST(CUnitQueue, IncreaseAndFreeGrouped)
{
const int buffer_size_pkts = 4;
CUnitQueue unit_queue;
unit_queue.init(buffer_size_pkts, 1500, AF_INET);
unit_queue.init(buffer_size_pkts, 1500);

vector<CUnit*> taken_units;
for (int i = 0; i < 5 * buffer_size_pkts; ++i)

0 comments on commit c3fed9c

Please sign in to comment.