forked from Chinachu/BonDriver_Mirakurun
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathGrabTsData.cpp
130 lines (113 loc) · 3.55 KB
/
GrabTsData.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//------------------------------------------------------------------------------
// File: GrabTsData.cpp
// Implementation of GrabTsData
//------------------------------------------------------------------------------
#include "GrabTsData.h"
// Put TS data into the ring buffer
//
BOOL GrabTsData::put_TsStream(BYTE *pSrc, DWORD dwSize)
{
if (dwSize < 1)
return FALSE;
// accumulate number of TS data received
std::atomic_fetch_add(&m_nAccumData, dwSize);
// lock push and pull positions
DWORD nPush, nPull;
nPush = std::atomic_load(&m_nPush);
nPull = std::atomic_load(&m_nPull);
// copy TS data to the ring buffer
DWORD nTail = RING_BUF_SIZE - nPush; // size between the current position and the buffer end
DWORD nData = (RING_BUF_SIZE + nPush - nPull) % RING_BUF_SIZE; // size of TS data stored
nData = RING_BUF_SIZE - nData - 1; // size of available buffer (empty buffer - 1)
nData = min(nData, dwSize);
if (nData < nTail) {
CopyMemory(m_pBuf + nPush, pSrc, nData);
nPush += nData;
}
else {
CopyMemory(m_pBuf + m_nPush, pSrc, nTail);
CopyMemory(m_pBuf, pSrc + nTail, nData - nTail);
nPush = nData - nTail;
}
// update the push position
std::atomic_store(&m_nPush, nPush);
// set WaitTsStream to the signaled state
if (m_phOnStreamEvent)
SetEvent(*m_phOnStreamEvent);
return TRUE;
}
// Get TS data from the ring buffer
//
BOOL GrabTsData::get_TsStream(BYTE **ppDst, DWORD *pdwSize, DWORD *pdwRemain)
{
// purge the pull buffer
if (std::atomic_load(&m_bPurge)) {
std::atomic_store(&m_nPull, std::atomic_load(&m_nPush));
std::atomic_store(&m_nAccumData, 0); // reset bitrate
std::atomic_store(&m_bPurge, FALSE);
return FALSE;
}
// lock push and pull positions
DWORD nPush, nPull;
nPush = std::atomic_load(&m_nPush);
nPull = std::atomic_load(&m_nPull);
// copy TS data to the destination buffer
DWORD nTail = RING_BUF_SIZE - nPull; // size between the current position and the buffer end
DWORD nData = (RING_BUF_SIZE + nPush - nPull) % RING_BUF_SIZE; // size of TS data stored
DWORD nRemain = nData;
if (nData > 0) {
nData = min(nData, DATA_BUF_SIZE);
if (nData < nTail) {
CopyMemory(m_pDst, m_pBuf + nPull, nData);
nPull += nData;
} else {
CopyMemory(m_pDst, m_pBuf + nPull, nTail);
CopyMemory(m_pDst + nTail, m_pBuf, nData - nTail);
nPull = nData - nTail;
}
nRemain -= nData;
// update the pull position
std::atomic_store(&m_nPull, nPull);
}
// set destination variables
*ppDst = m_pDst;
*pdwSize = nData;
if (pdwRemain) {
*pdwRemain = CEIL(nRemain, DATA_BUF_SIZE);
}
return TRUE;
}
// Purge TS data
//
BOOL GrabTsData::purge_TsStream(void)
{
std::atomic_store(&m_bPurge, TRUE);
return TRUE;
}
// Get the number of TS data blocks in the ring buffer
//
BOOL GrabTsData::get_ReadyCount(DWORD *pdwRemain)
{
if (pdwRemain) {
DWORD nPush, nPull;
nPush = std::atomic_load(&m_nPush);
nPull = std::atomic_load(&m_nPull);
*pdwRemain = CEIL((RING_BUF_SIZE + nPush - nPull) % RING_BUF_SIZE, DATA_BUF_SIZE);
}
return TRUE;
}
// Calculate bitrate
//
BOOL GrabTsData::get_Bitrate(float *pfBitrate)
{
static double dBitrate = 0;
static uint64_t ui64LastTime = GetTickCount64();
uint64_t ui64Now = GetTickCount64(); // ms
uint64_t ui64Duration = ui64Now - ui64LastTime;
if (ui64Duration >= 1000) {
dBitrate = std::atomic_exchange(&m_nAccumData, 0) / ui64Duration * 8 * 1000 / 1024 / 1024.0; // Mbps
ui64LastTime = ui64Now;
}
*pfBitrate = (float)min(dBitrate, 100);
return TRUE;
}