Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup groups: Take source rate estimate from an active link on idle member activation #2260

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 53 additions & 45 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,58 @@ int round_val(double val)
return static_cast<int>(round(val));
}

CRateEstimator::CRateEstimator()
: m_iInRatePktsCount(0)
, m_iInRateBytesCount(0)
, m_InRatePeriod(INPUTRATE_FAST_START_US) // 0.5 sec (fast start)
, m_iInRateBps(INPUTRATE_INITIAL_BYTESPS)
{}

void CRateEstimator::setInputRateSmpPeriod(int period)
{
m_InRatePeriod = (uint64_t)period; //(usec) 0=no input rate calculation
}

void CRateEstimator::updateInputRate(const time_point& time, int pkts, int bytes)
{
// no input rate calculation
if (m_InRatePeriod == 0)
return;

if (is_zero(m_tsInRateStartTime))
{
m_tsInRateStartTime = time;
return;
}
else if (time < m_tsInRateStartTime)
{
// Old packets are being submitted for estimation, e.g. during the backup link activation.
return;
}

m_iInRatePktsCount += pkts;
m_iInRateBytesCount += bytes;

// Trigger early update in fast start mode
const bool early_update = (m_InRatePeriod < INPUTRATE_RUNNING_US) && (m_iInRatePktsCount > INPUTRATE_MAX_PACKETS);

const uint64_t period_us = count_microseconds(time - m_tsInRateStartTime);
if (early_update || period_us > m_InRatePeriod)
{
// Required Byte/sec rate (payload + headers)
m_iInRateBytesCount += (m_iInRatePktsCount * CPacket::SRT_DATA_HDR_SIZE);
m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
HLOGC(bslog.Debug,
log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
<< " rate=" << (m_iInRateBps * 8) / 1000 << "kbps interval=" << period_us);
m_iInRatePktsCount = 0;
m_iInRateBytesCount = 0;
m_tsInRateStartTime = time;

setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
}
}

CSndBuffer::CSndBuffer(int size, int mss)
: m_BufLock()
, m_pBlock(NULL)
Expand All @@ -122,10 +174,6 @@ CSndBuffer::CSndBuffer(int size, int mss)
, m_iMSS(mss)
, m_iCount(0)
, m_iBytesCount(0)
, m_iInRatePktsCount(0)
, m_iInRateBytesCount(0)
, m_InRatePeriod(INPUTRATE_FAST_START_US) // 0.5 sec (fast start)
, m_iInRateBps(INPUTRATE_INITIAL_BYTESPS)
{
// initial physical buffer of "size"
m_pBuffer = new Buffer;
Expand Down Expand Up @@ -273,7 +321,7 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
m_iCount += size;
m_iBytesCount += len;

updateInputRate(m_tsLastOriginTime, size, len);
m_rateEstimator.updateInputRate(m_tsLastOriginTime, size, len);
updAvgBufSize(m_tsLastOriginTime);

// MSGNO_SEQ::mask has a form: 00000011111111...
Expand All @@ -287,46 +335,6 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
m_iNextMsgNo = nextmsgno;
}

void CSndBuffer::setInputRateSmpPeriod(int period)
{
m_InRatePeriod = (uint64_t)period; //(usec) 0=no input rate calculation
}

void CSndBuffer::updateInputRate(const steady_clock::time_point& time, int pkts, int bytes)
{
// no input rate calculation
if (m_InRatePeriod == 0)
return;

if (is_zero(m_tsInRateStartTime))
{
m_tsInRateStartTime = time;
return;
}

m_iInRatePktsCount += pkts;
m_iInRateBytesCount += bytes;

// Trigger early update in fast start mode
const bool early_update = (m_InRatePeriod < INPUTRATE_RUNNING_US) && (m_iInRatePktsCount > INPUTRATE_MAX_PACKETS);

const uint64_t period_us = count_microseconds(time - m_tsInRateStartTime);
if (early_update || period_us > m_InRatePeriod)
{
// Required Byte/sec rate (payload + headers)
m_iInRateBytesCount += (m_iInRatePktsCount * CPacket::SRT_DATA_HDR_SIZE);
m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
HLOGC(bslog.Debug,
log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
<< " rate=" << (m_iInRateBps * 8) / 1000 << "kbps interval=" << period_us);
m_iInRatePktsCount = 0;
m_iInRateBytesCount = 0;
m_tsInRateStartTime = time;

setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
}
}

int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
{
int size = len / m_iMSS;
Expand Down
75 changes: 49 additions & 26 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,47 @@ class AvgBufSize
double m_dTimespanMAvg;
};

/// The class to estimate source bitrate based on samples submitted to the buffer.
/// Is currently only used by the CSndBuffer.
class CRateEstimator
{
typedef sync::steady_clock::time_point time_point;
typedef sync::steady_clock::duration duration;
public:
CRateEstimator();

public:
uint64_t getInRatePeriod() const { return m_InRatePeriod; }

/// Retrieve input bitrate in bytes per second
int getInputRate() const { return m_iInRateBps; }

void setInputRateSmpPeriod(int period);

/// Update input rate calculation.
/// @param [in] time current time in microseconds
/// @param [in] pkts number of packets newly added to the buffer
/// @param [in] bytes number of payload bytes in those newly added packets
///
/// @return Current size of the data in the sending list.
void updateInputRate(const time_point& time, int pkts = 0, int bytes = 0);

void resetInputRateSmpPeriod(bool disable = false) { setInputRateSmpPeriod(disable ? 0 : INPUTRATE_FAST_START_US); }

private: // Constants
static const uint64_t INPUTRATE_FAST_START_US = 500000; // 500 ms
static const uint64_t INPUTRATE_RUNNING_US = 1000000; // 1000 ms
static const int64_t INPUTRATE_MAX_PACKETS = 2000; // ~ 21 Mbps of 1316 bytes payload
static const int INPUTRATE_INITIAL_BYTESPS = BW_INFINITE;

private:
int m_iInRatePktsCount; // number of payload bytes added since InRateStartTime
int m_iInRateBytesCount; // number of payload bytes added since InRateStartTime
time_point m_tsInRateStartTime;
uint64_t m_InRatePeriod; // usec
int m_iInRateBps; // Input Rate in Bytes/sec
};

class CSndBuffer
{
typedef sync::steady_clock::time_point time_point;
Expand Down Expand Up @@ -195,30 +236,19 @@ class CSndBuffer
SRT_ATTR_EXCLUDES(m_BufLock)
duration getBufferingDelay(const time_point& tnow) const;

uint64_t getInRatePeriod() const { return m_InRatePeriod; }
uint64_t getInRatePeriod() const { return m_rateEstimator.getInRatePeriod(); }

/// Retrieve input bitrate in bytes per second
int getInputRate() const { return m_iInRateBps; }
int getInputRate() const { return m_rateEstimator.getInputRate(); }

/// Update input rate calculation.
/// @param [in] time current time in microseconds
/// @param [in] pkts number of packets newly added to the buffer
/// @param [in] bytes number of payload bytes in those newly added packets
///
/// @return Current size of the data in the sending list.
void updateInputRate(const time_point& time, int pkts = 0, int bytes = 0);
void resetInputRateSmpPeriod(bool disable = false) { m_rateEstimator.resetInputRateSmpPeriod(disable); }

void resetInputRateSmpPeriod(bool disable = false) { setInputRateSmpPeriod(disable ? 0 : INPUTRATE_FAST_START_US); }
const CRateEstimator& getRateEstimator() const { return m_rateEstimator; }

void setRateEstimator(const CRateEstimator& other) { m_rateEstimator = other; }

private:
void increase();
void setInputRateSmpPeriod(int period);

private: // Constants
static const uint64_t INPUTRATE_FAST_START_US = 500000; // 500 ms
static const uint64_t INPUTRATE_RUNNING_US = 1000000; // 1000 ms
static const int64_t INPUTRATE_MAX_PACKETS = 2000; // ~ 21 Mbps of 1316 bytes payload
static const int INPUTRATE_INITIAL_BYTESPS = BW_INFINITE;

private:
mutable sync::Mutex m_BufLock; // used to synchronize buffer operation
Expand Down Expand Up @@ -249,7 +279,7 @@ class CSndBuffer

// m_pBlock: The head pointer
// m_pFirstBlock: The first block
// m_pCurrBlock: The current block
// m_pCurrBlock: The current block
// m_pLastBlock: The last block (if first == last, buffer is empty)

struct Buffer
Expand All @@ -263,20 +293,13 @@ class CSndBuffer

int m_iSize; // buffer size (number of packets)
int m_iMSS; // maximum seqment/packet size

int m_iCount; // number of used blocks

int m_iBytesCount; // number of payload bytes in queue
time_point m_tsLastOriginTime;

AvgBufSize m_mavg;

int m_iInRatePktsCount; // number of payload bytes added since InRateStartTime
int m_iInRateBytesCount; // number of payload bytes added since InRateStartTime
time_point m_tsInRateStartTime;
uint64_t m_InRatePeriod; // usec
int m_iInRateBps; // Input Rate in Bytes/sec
int m_iAvgPayloadSz; // Average packet payload size
CRateEstimator m_rateEstimator;

private:
CSndBuffer(const CSndBuffer&);
Expand Down
3 changes: 2 additions & 1 deletion srtcore/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ std::string TransmissionEventStr(ETransmissionEvent ev)
"checktimer",
"send",
"receive",
"custom"
"custom",
"sync"
};

size_t vals_size = Size(vals);
Expand Down
1 change: 1 addition & 0 deletions srtcore/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ enum ETransmissionEvent
TEV_SEND, // --> When the packet is scheduled for sending - older CCC::onPktSent
TEV_RECEIVE, // --> When a data packet was received - older CCC::onPktReceived
TEV_CUSTOM, // --> probably dead call - older CCC::processCustomMsg
TEV_SYNC, // --> Backup group. When rate estimation is derived from an active member, and update is needed.

TEV_E_SIZE
};
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7550,7 +7550,7 @@ bool srt::CUDT::updateCC(ETransmissionEvent evt, const EventVariant arg)

// This part is also required only by LiveCC, however not
// moved there due to that it needs access to CSndBuffer.
if (evt == TEV_ACK || evt == TEV_LOSSREPORT || evt == TEV_CHECKTIMER)
if (evt == TEV_ACK || evt == TEV_LOSSREPORT || evt == TEV_CHECKTIMER || evt == TEV_SYNC)
{
// Specific part done when MaxBW is set to 0 (auto) and InputBW is 0.
// This requests internal input rate sampling.
Expand Down
16 changes: 16 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,22 @@ class CUDT
static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt);
static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt);

CRateEstimator getRateEstimator() const
{
if (!m_pSndBuffer)
return CRateEstimator();
return m_pSndBuffer->getRateEstimator();
}

void setRateEstimator(const CRateEstimator& rate)
{
if (!m_pSndBuffer)
return;

m_pSndBuffer->setRateEstimator(rate);
updateCC(TEV_SYNC, EventVariant(0));
}


private: // Identification
CUDTSocket* const m_parent; // Temporary, until the CUDTSocket class is merged with CUDT
Expand Down
15 changes: 11 additions & 4 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3525,8 +3525,8 @@ size_t CUDTGroup::sendBackup_TryActivateStandbyIfNeeded(
return 0;
}

const unsigned num_stable = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_FRESH);
const unsigned num_fresh = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_STABLE);
const unsigned num_stable = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_STABLE);
const unsigned num_fresh = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_FRESH);

if (num_stable + num_fresh == 0)
{
Expand Down Expand Up @@ -3571,6 +3571,10 @@ size_t CUDTGroup::sendBackup_TryActivateStandbyIfNeeded(

try
{
CUDT& cudt = d->ps->core();
// Take source rate estimation from an active member (needed for the input rate estimation mode).
cudt.setRateEstimator(w_sendBackupCtx.getRateEstimate());

// TODO: At this point all packets that could be sent
// are located in m_SenderBuffer. So maybe just use sendBackupRexmit()?
if (w_curseq == SRT_SEQNO_NONE)
Expand All @@ -3582,7 +3586,7 @@ size_t CUDTGroup::sendBackup_TryActivateStandbyIfNeeded(
HLOGC(gslog.Debug,
log << "grp/sendBackup: ... trying @" << d->id << " - sending the VERY FIRST message");

stat = d->ps->core().sendmsg2(buf, len, (w_mc));
stat = cudt.sendmsg2(buf, len, (w_mc));
if (stat != -1)
{
// This will be no longer used, but let it stay here.
Expand All @@ -3599,7 +3603,7 @@ size_t CUDTGroup::sendBackup_TryActivateStandbyIfNeeded(
<< " collected messages...");
// Note: this will set the currently required packet
// because it has been just freshly added to the sender buffer
stat = sendBackupRexmit(d->ps->core(), (w_mc));
stat = sendBackupRexmit(cudt, (w_mc));
}
++num_activated;
}
Expand Down Expand Up @@ -4384,6 +4388,9 @@ int CUDTGroup::sendBackup_SendOverActive(const char* buf, int len, SRT_MSGCTRL&
{
++w_nsuccessful;
w_maxActiveWeight = max(w_maxActiveWeight, d->weight);

if (u.m_pSndBuffer)
w_sendBackupCtx.setRateEstimate(u.m_pSndBuffer->getRateEstimator());
}
else if (erc == SRT_EASYNCSND)
{
Expand Down
5 changes: 5 additions & 0 deletions srtcore/group_backup.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,16 @@ namespace groups

std::string printMembers() const;

void setRateEstimate(const CRateEstimator& rate) { m_rateEstimate = rate; }

const CRateEstimator& getRateEstimate() const { return m_rateEstimate; }

private:
std::vector<BackupMemberStateEntry> m_memberStates; // TODO: consider std::map here?
unsigned m_stateCounter[BKUPST_E_SIZE];
uint16_t m_activeMaxWeight;
uint16_t m_standbyMaxWeight;
CRateEstimator m_rateEstimate; // The rate estimator state of the active link to copy to a backup on activation.
};

} // namespace groups
Expand Down