Skip to content

Commit

Permalink
[core] Refactored the core stats structure (#2212).
Browse files Browse the repository at this point in the history
Group stats are moved to the new structure.
  • Loading branch information
maxsharabayko authored Dec 20, 2021
1 parent 3558cd0 commit 5f7bc23
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 392 deletions.
7 changes: 0 additions & 7 deletions srtcore/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,13 +503,6 @@ bool SrtParseConfig(string s, SrtConfig& w_config)
return true;
}

uint64_t PacketMetric::fullBytes()
{
static const int PKT_HDR_SIZE = CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE;
return bytes + pkts * PKT_HDR_SIZE;
}


namespace srt_logging
{

Expand Down
86 changes: 0 additions & 86 deletions srtcore/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -1414,90 +1414,4 @@ inline std::string SrtVersionString(int version)

bool SrtParseConfig(std::string s, srt::SrtConfig& w_config);

struct PacketMetric
{
uint32_t pkts;
uint64_t bytes;

void update(uint64_t size)
{
++pkts;
bytes += size;
}

void update(size_t mult, uint64_t value)
{
pkts += (uint32_t) mult;
bytes += mult * value;
}

uint64_t fullBytes();
};

template <class METRIC_TYPE>
struct MetricOp;

template <class METRIC_TYPE>
struct MetricUsage
{
METRIC_TYPE local;
METRIC_TYPE total;

void Clear()
{
MetricOp<METRIC_TYPE>::Clear(local);
}

void Init()
{
MetricOp<METRIC_TYPE>::Clear(total);
Clear();
}

void Update(uint64_t value)
{
local += value;
total += value;
}

void UpdateTimes(size_t mult, uint64_t value)
{
local += mult * value;
total += mult * value;
}
};

template <>
inline void MetricUsage<PacketMetric>::Update(uint64_t value)
{
local.update(value);
total.update(value);
}

template <>
inline void MetricUsage<PacketMetric>::UpdateTimes(size_t mult, uint64_t value)
{
local.update(mult, value);
total.update(mult, value);
}

template <class METRIC_TYPE>
struct MetricOp
{
static void Clear(METRIC_TYPE& m)
{
m = 0;
}
};

template <>
struct MetricOp<PacketMetric>
{
static void Clear(PacketMetric& p)
{
p.pkts = 0;
p.bytes = 0;
}
};

#endif
292 changes: 95 additions & 197 deletions srtcore/core.cpp

Large diffs are not rendered by default.

66 changes: 6 additions & 60 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ modified by
#include "utilities.h"
#include "logger_defs.h"

#include "stats.h"

#include <haicrypt.h>


Expand Down Expand Up @@ -1065,73 +1067,17 @@ class CUDT
struct CoreStats
{
time_point tsStartTime; // timestamp when the UDT entity is started
int64_t sentTotal; // total number of sent data packets, including retransmissions
int64_t sentUniqTotal; // total number of sent data packets, excluding rexmit and filter control
int64_t recvTotal; // total number of received packets
int64_t recvUniqTotal; // total number of received and delivered packets
int sndLossTotal; // total number of lost packets (sender side)
int rcvLossTotal; // total number of lost packets (receiver side)
int retransTotal; // total number of retransmitted packets
int sentACKTotal; // total number of sent ACK packets
int recvACKTotal; // total number of received ACK packets
int sentNAKTotal; // total number of sent NAK packets
int recvNAKTotal; // total number of received NAK packets
int sndDropTotal;
int rcvDropTotal;
uint64_t bytesSentTotal; // total number of bytes sent, including retransmissions
uint64_t bytesSentUniqTotal; // total number of bytes sent, including retransmissions
uint64_t bytesRecvTotal; // total number of received bytes
uint64_t bytesRecvUniqTotal; // total number of received bytes
uint64_t rcvBytesLossTotal; // total number of loss bytes (estimate)
uint64_t bytesRetransTotal; // total number of retransmitted bytes
uint64_t sndBytesDropTotal;
uint64_t rcvBytesDropTotal;
int m_rcvUndecryptTotal;
uint64_t m_rcvBytesUndecryptTotal;

int sndFilterExtraTotal;
int rcvFilterExtraTotal;
int rcvFilterSupplyTotal;
int rcvFilterLossTotal;
stats::Sender sndr; // sender statistics
stats::Receiver rcvr; // receiver statistics

int64_t m_sndDurationTotal; // total real time for sending

time_point tsLastSampleTime; // last performance sample time
int64_t traceSent; // number of packets sent in the last trace interval
int64_t traceSentUniq; // number of original packets sent in the last trace interval
int64_t traceRecv; // number of packets received in the last trace interval
int64_t traceRecvUniq; // number of packets received AND DELIVERED in the last trace interval
int traceSndLoss; // number of lost packets in the last trace interval (sender side)
int traceRcvLoss; // number of lost packets in the last trace interval (receiver side)
int traceRetrans; // number of retransmitted packets in the last trace interval
int sentACK; // number of ACKs sent in the last trace interval
int recvACK; // number of ACKs received in the last trace interval
int sentNAK; // number of NAKs sent in the last trace interval
int recvNAK; // number of NAKs received in the last trace interval
int traceSndDrop;
int traceRcvDrop;
int traceRcvRetrans;
int traceReorderDistance;
double traceBelatedTime;
int64_t traceRcvBelated;
uint64_t traceBytesSent; // number of bytes sent in the last trace interval
uint64_t traceBytesSentUniq; // number of bytes sent in the last trace interval
uint64_t traceBytesRecv; // number of bytes sent in the last trace interval
uint64_t traceBytesRecvUniq; // number of bytes sent in the last trace interval
uint64_t traceRcvBytesLoss; // number of bytes bytes lost in the last trace interval (estimate)
uint64_t traceBytesRetrans; // number of bytes retransmitted in the last trace interval
uint64_t traceSndBytesDrop;
uint64_t traceRcvBytesDrop;
int traceRcvUndecrypt;
uint64_t traceRcvBytesUndecrypt;

int sndFilterExtra;
int rcvFilterExtra;
int rcvFilterSupply;
int rcvFilterLoss;


int64_t sndDuration; // real time for sending
time_point sndDurationCounter; // timers to record the sending Duration
time_point sndDurationCounter; // timers to record the sending Duration
} m_stats;

public:
Expand Down
1 change: 1 addition & 0 deletions srtcore/filelist.maf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ queue.h
congctl.h
socketconfig.h
srt_compat.h
stats.h
threadname.h
tsbpd_time.h
utilities.h
Expand Down
40 changes: 20 additions & 20 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,7 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
}

// Now that at least one link has succeeded, update sending stats.
m_stats.sent.Update(len);
m_stats.sent.count(len);

// Pity that the blocking mode only determines as to whether this function should
// block or not, but the epoll flags must be updated regardless of the mode.
Expand Down Expand Up @@ -2259,7 +2259,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
pos->packet.clear();

// Update stats as per delivery
m_stats.recv.Update(len);
m_stats.recv.count(len);
updateAvgPayloadSize(len);

// We predict to have only one packet ahead, others are pending to be reported by tsbpd.
Expand Down Expand Up @@ -2491,7 +2491,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
log << "group/recv: @" << id << " %" << mctrl.pktseq << " #" << mctrl.msgno
<< " BEHIND base=%" << m_RcvBaseSeqNo << " - discarding");
// The sequence is recorded, the packet has to be discarded.
m_stats.recvDiscard.Update(stat);
m_stats.recvDiscard.count(stat);
continue;
}

Expand Down Expand Up @@ -2527,7 +2527,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
fillGroupData((w_mc), mctrl);

// Update stats as per delivery
m_stats.recv.Update(output_size);
m_stats.recv.count(output_size);
updateAvgPayloadSize(output_size);

// Record, but do not update yet, until all sockets are handled.
Expand Down Expand Up @@ -2681,7 +2681,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
const int32_t jump = (CSeqNo(slowest_kangaroo->second.mctrl.pktseq) - CSeqNo(m_RcvBaseSeqNo)) - 1;
if (jump > 0)
{
m_stats.recvDrop.UpdateTimes(jump, avgRcvPacketSize());
m_stats.recvDrop.count(stats::BytesPackets(jump * static_cast<uint64_t>(avgRcvPacketSize()), jump));
LOGC(grlog.Warn,
log << "@" << m_GroupID << " GROUP RCV-DROPPED " << jump << " packet(s): seqno %"
<< m_RcvBaseSeqNo << " to %" << slowest_kangaroo->second.mctrl.pktseq);
Expand All @@ -2703,7 +2703,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
pkt.clear();

// Update stats as per delivery
m_stats.recv.Update(len);
m_stats.recv.count(len);
updateAvgPayloadSize(len);

// It is unlikely to have a packet ahead because usually having one packet jumped-ahead
Expand Down Expand Up @@ -2830,21 +2830,21 @@ void CUDTGroup::bstatsSocket(CBytePerfMon* perf, bool clear)

perf->msTimeStamp = count_milliseconds(currtime - m_tsStartTime);

perf->pktSentUnique = m_stats.sent.local.pkts;
perf->pktRecvUnique = m_stats.recv.local.pkts;
perf->pktRcvDrop = m_stats.recvDrop.local.pkts;
perf->pktSentUnique = m_stats.sent.trace.count();
perf->pktRecvUnique = m_stats.recv.trace.count();
perf->pktRcvDrop = m_stats.recvDrop.trace.count();

perf->byteSentUnique = m_stats.sent.local.fullBytes();
perf->byteRecvUnique = m_stats.recv.local.fullBytes();
perf->byteRcvDrop = m_stats.recvDrop.local.fullBytes();
perf->byteSentUnique = m_stats.sent.trace.bytesWithHdr();
perf->byteRecvUnique = m_stats.recv.trace.bytesWithHdr();
perf->byteRcvDrop = m_stats.recvDrop.trace.bytesWithHdr();

perf->pktSentUniqueTotal = m_stats.sent.total.pkts;
perf->pktRecvUniqueTotal = m_stats.recv.total.pkts;
perf->pktRcvDropTotal = m_stats.recvDrop.total.pkts;
perf->pktSentUniqueTotal = m_stats.sent.total.count();
perf->pktRecvUniqueTotal = m_stats.recv.total.count();
perf->pktRcvDropTotal = m_stats.recvDrop.total.count();

perf->byteSentUniqueTotal = m_stats.sent.total.fullBytes();
perf->byteRecvUniqueTotal = m_stats.recv.total.fullBytes();
perf->byteRcvDropTotal = m_stats.recvDrop.total.fullBytes();
perf->byteSentUniqueTotal = m_stats.sent.total.bytesWithHdr();
perf->byteRecvUniqueTotal = m_stats.recv.total.bytesWithHdr();
perf->byteRcvDropTotal = m_stats.recvDrop.total.bytesWithHdr();

const double interval = static_cast<double>(count_microseconds(currtime - m_stats.tsLastSampleTime));
perf->mbpsSendRate = double(perf->byteSent) * 8.0 / interval;
Expand Down Expand Up @@ -3165,7 +3165,7 @@ CUDTGroup::BackupMemberState CUDTGroup::sendBackup_QualifyActiveState(const gli_
}

enterCS(u.m_StatsLock);
const int64_t drop_total = u.m_stats.sndDropTotal;
const int64_t drop_total = u.m_stats.sndr.dropped.total.count();
leaveCS(u.m_StatsLock);

const bool have_new_drops = d->pktSndDropTotal != drop_total;
Expand Down Expand Up @@ -4039,7 +4039,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
}

// At least one link has succeeded, update sending stats.
m_stats.sent.Update(len);
m_stats.sent.count(len);

// Now fill in the socket table. Check if the size is enough, if not,
// then set the pointer to NULL and set the correct size.
Expand Down
30 changes: 14 additions & 16 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,31 +697,29 @@ class CUDTGroup
time_point tsActivateTime; // Time when this group sent or received the first data packet
time_point tsLastSampleTime; // Time reset when clearing stats

MetricUsage<PacketMetric> sent; // number of packets sent from the application
MetricUsage<PacketMetric> recv; // number of packets delivered from the group to the application
MetricUsage<PacketMetric>
recvDrop; // number of packets dropped by the group receiver (not received from any member)
MetricUsage<PacketMetric> recvDiscard; // number of packets discarded as already delivered
stats::Metric<stats::BytesPackets> sent; // number of packets sent from the application
stats::Metric<stats::BytesPackets> recv; // number of packets delivered from the group to the application
stats::Metric<stats::BytesPackets> recvDrop; // number of packets dropped by the group receiver (not received from any member)
stats::Metric<stats::BytesPackets> recvDiscard; // number of packets discarded as already delivered

void init()
{
tsActivateTime = srt::sync::steady_clock::time_point();
sent.Init();
recv.Init();
recvDrop.Init();
recvDiscard.Init();

reset();
tsLastSampleTime = srt::sync::steady_clock::now();
sent.reset();
recv.reset();
recvDrop.reset();
recvDiscard.reset();
}

void reset()
{
sent.Clear();
recv.Clear();
recvDrop.Clear();
recvDiscard.Clear();

tsLastSampleTime = srt::sync::steady_clock::now();

sent.resetTrace();
recv.resetTrace();
recvDrop.resetTrace();
recvDiscard.resetTrace();
}
} m_stats;

Expand Down
9 changes: 3 additions & 6 deletions srtcore/packetfilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ void srt::PacketFilter::receive(CUnit* unit, std::vector<CUnit*>& w_incoming, lo
{
// Packet not to be passthru, update stats
ScopedLock lg(m_parent->m_StatsLock);
++m_parent->m_stats.rcvFilterExtra;
++m_parent->m_stats.rcvFilterExtraTotal;
m_parent->m_stats.rcvr.recvdFilterExtra.count(1);
}

// w_loss_seqs enters empty into this function and can be only filled here. XXX ASSERT?
Expand All @@ -152,8 +151,7 @@ void srt::PacketFilter::receive(CUnit* unit, std::vector<CUnit*>& w_incoming, lo
if (dist > 0)
{
ScopedLock lg(m_parent->m_StatsLock);
m_parent->m_stats.rcvFilterLoss += dist;
m_parent->m_stats.rcvFilterLossTotal += dist;
m_parent->m_stats.rcvr.lossFilter.count(dist);
}
else
{
Expand All @@ -171,8 +169,7 @@ void srt::PacketFilter::receive(CUnit* unit, std::vector<CUnit*>& w_incoming, lo
InsertRebuilt(w_incoming, m_unitq);

ScopedLock lg(m_parent->m_StatsLock);
m_parent->m_stats.rcvFilterSupply += nsupply;
m_parent->m_stats.rcvFilterSupplyTotal += nsupply;
m_parent->m_stats.rcvr.suppliedByFilter.count(nsupply);
}

// Now that all units have been filled as they should be,
Expand Down
Loading

0 comments on commit 5f7bc23

Please sign in to comment.