diff --git a/README.md b/README.md
index 9681f71508..41aae22caf 100755
--- a/README.md
+++ b/README.md
@@ -485,6 +485,7 @@ Supported operating systems and hardware:
* 2013-10-17, Created.
## History
+* v2.0, 2014-12-05, fix [#251](https://github.com/winlinvip/simple-rtmp-server/issues/251), 9k+ clients, use fast cache for msgs queue. 2.0.57
* v2.0, 2014-12-04, fix [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), add mw(merged-write) config. 2.0.53
* v2.0, 2014-12-04, for [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), support mr(merged-read) config and reload. 2.0.52.
* v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index 0de7e9ba6f..b2f5199a65 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -605,7 +605,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
- if ((ret = consumer->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
+ if ((ret = consumer->dump_packets(&msgs, &count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
}
diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp
index 595dfbe2e7..228cb0d327 100644
--- a/trunk/src/app/srs_app_source.cpp
+++ b/trunk/src/app/srs_app_source.cpp
@@ -41,6 +41,7 @@ using namespace std;
#include
#include
#include
+#include
#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 40
@@ -166,22 +167,12 @@ SrsMessageQueue::~SrsMessageQueue()
clear();
}
-int SrsMessageQueue::count()
-{
- return (int)msgs.size();
-}
-
-int SrsMessageQueue::duration()
-{
- return (int)(av_end_time - av_start_time);
-}
-
void SrsMessageQueue::set_queue_size(double queue_size)
{
queue_size_ms = (int)(queue_size * 1000);
}
-int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
+int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
{
int ret = ERROR_SUCCESS;
@@ -196,6 +187,11 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
msgs.push_back(msg);
while (av_end_time - av_start_time > queue_size_ms) {
+ // notice the caller queue already overflow and shrinked.
+ if (is_overflow) {
+ *is_overflow = true;
+ }
+
shrink();
}
@@ -305,10 +301,20 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
+
+ mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS);
+ mw_count = 0;
+ mw_first_pkt = mw_last_pkt = 0;
}
SrsConsumer::~SrsConsumer()
{
+ if (mw_cache) {
+ mw_cache->free(mw_count);
+ mw_count = 0;
+ }
+ srs_freep(mw_cache);
+
source->on_consumer_destroy(this);
srs_freep(jitter);
srs_freep(queue);
@@ -341,22 +347,53 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
}
}
- if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
- return ret;
+ // use fast cache if available
+ if (mw_count < mw_cache->max) {
+ // update fast cache timestamps
+ if (mw_count == 0) {
+ mw_first_pkt = msg->header.timestamp;
+ }
+ mw_last_pkt = msg->header.timestamp;
+
+ mw_cache->msgs[mw_count++] = msg;
+ } else{
+ // fast cache is full, use queue.
+ bool is_overflow = false;
+ if ((ret = queue->enqueue(msg, &is_overflow)) != ERROR_SUCCESS) {
+ return ret;
+ }
+ // when overflow, clear cache and refresh the fast cache.
+ if (is_overflow) {
+ mw_cache->free(mw_count);
+ if ((ret = dumps_queue_to_fast_cache()) != ERROR_SUCCESS) {
+ return ret;
+ }
+ }
}
// fire the mw when msgs is enough.
- if (mw_waiting && queue->count() > mw_min_msgs && queue->duration() > mw_duration) {
- st_cond_signal(mw_wait);
- mw_waiting = false;
+ if (mw_waiting) {
+ // when fast cache not overflow, always flush.
+ // so we donot care about the queue.
+ bool fast_cache_overflow = mw_count >= mw_cache->max;
+ int duration_ms = (int)(mw_last_pkt - mw_first_pkt);
+ bool match_min_msgs = mw_count > mw_min_msgs;
+
+ // when fast cache overflow, or duration ok, signal to flush.
+ if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {
+ st_cond_signal(mw_wait);
+ mw_waiting = false;
+ }
}
return ret;
}
-int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
+int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count)
{
- srs_assert(max_count > 0);
+ int ret =ERROR_SUCCESS;
+
+ srs_assert(msgs->max > 0);
if (should_update_source_id) {
srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());
@@ -365,10 +402,24 @@ int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
// paused, return nothing.
if (paused) {
- return ERROR_SUCCESS;
+ return ret;
+ }
+
+ // only dumps an whole array to msgs.
+ for (int i = 0; i < mw_count; i++) {
+ msgs->msgs[i] = mw_cache->msgs[i];
}
+ *count = mw_count;
- return queue->dump_packets(max_count, pmsgs, count);
+ // when fast cache is not filled,
+ // we donot check the queue, direclty zero fast cache.
+ if (mw_count < mw_cache->max) {
+ mw_count = 0;
+ mw_first_pkt = mw_last_pkt = 0;
+ return ret;
+ }
+
+ return dumps_queue_to_fast_cache();
}
void SrsConsumer::wait(int nb_msgs, int duration)
@@ -376,14 +427,20 @@ void SrsConsumer::wait(int nb_msgs, int duration)
mw_min_msgs = nb_msgs;
mw_duration = duration;
- // already ok, donot wait.
- if (queue->count() > mw_min_msgs && queue->duration() > mw_duration) {
+ // when fast cache not overflow, always flush.
+ // so we donot care about the queue.
+ bool fast_cache_overflow = mw_count >= mw_cache->max;
+ int duration_ms = (int)(mw_last_pkt - mw_first_pkt);
+ bool match_min_msgs = mw_count > mw_min_msgs;
+
+ // when fast cache overflow, or duration ok, signal to flush.
+ if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {
return;
}
// the enqueue will notify this cond.
mw_waiting = true;
-
+ // wait for msgs to incoming.
st_cond_wait(mw_wait);
}
@@ -397,6 +454,26 @@ int SrsConsumer::on_play_client_pause(bool is_pause)
return ret;
}
+int SrsConsumer::dumps_queue_to_fast_cache()
+{
+ int ret =ERROR_SUCCESS;
+
+ // fill fast cache with queue.
+ if ((ret = queue->dump_packets(mw_cache->max, mw_cache->msgs, mw_count)) != ERROR_SUCCESS) {
+ return ret;
+ }
+ // set the timestamp when got message.
+ if (mw_count > 0) {
+ SrsMessage* first_msg = mw_cache->msgs[0];
+ mw_first_pkt = first_msg->header.timestamp;
+
+ SrsMessage* last_msg = mw_cache->msgs[mw_count - 1];
+ mw_last_pkt = last_msg->header.timestamp;
+ }
+
+ return ret;
+}
+
SrsGopCache::SrsGopCache()
{
cached_video_count = 0;
diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp
index 63795b66c0..b2f93eeeca 100644
--- a/trunk/src/app/srs_app_source.hpp
+++ b/trunk/src/app/srs_app_source.hpp
@@ -48,6 +48,7 @@ class SrsRequest;
class SrsStSocket;
class SrsRtmpServer;
class SrsEdgeProxyContext;
+class SrsMessageArray;
#ifdef SRS_AUTO_HLS
class SrsHls;
#endif
@@ -115,14 +116,6 @@ class SrsMessageQueue
SrsMessageQueue();
virtual ~SrsMessageQueue();
public:
- /**
- * get the count of queue.
- */
- virtual int count();
- /**
- * get duration of queue.
- */
- virtual int duration();
/**
* set the queue size
* @param queue_size the queue size in seconds.
@@ -132,8 +125,9 @@ class SrsMessageQueue
/**
* enqueue the message, the timestamp always monotonically.
* @param msg, the msg to enqueue, user never free it whatever the return code.
+ * @param is_overflow, whether overflow and shrinked. NULL to ignore.
*/
- virtual int enqueue(SrsSharedPtrMessage* msg);
+ virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
/**
* get packets in consumer queue.
* @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
@@ -168,6 +162,14 @@ class SrsConsumer
bool mw_waiting;
int mw_min_msgs;
int mw_duration;
+ // use fast cache for msgs
+ // @see https://github.com/winlinvip/simple-rtmp-server/issues/251
+ SrsMessageArray* mw_cache;
+ // the count of msg in fast cache.
+ int mw_count;
+ // the packet time in fast cache.
+ int64_t mw_first_pkt;
+ int64_t mw_last_pkt;
public:
SrsConsumer(SrsSource* _source);
virtual ~SrsConsumer();
@@ -197,11 +199,11 @@ class SrsConsumer
virtual int enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag);
/**
* get packets in consumer queue.
- * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
- * @count the count in array, output param.
+ * @param msgs the msgs array to dump packets to send.
+ * @param count the count in array, output param.
* @max_count the max count to dequeue, must be positive.
*/
- virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count);
+ virtual int dump_packets(SrsMessageArray* msgs, int* count);
/**
* wait for messages incomming, atleast nb_msgs and in duration.
* @param nb_msgs the messages count to wait.
@@ -212,6 +214,12 @@ class SrsConsumer
* when client send the pause message.
*/
virtual int on_play_client_pause(bool is_pause);
+private:
+ /**
+ * dumps the queue to fast cache,
+ * when fast cache is clear or queue is overflow.
+ */
+ virtual int dumps_queue_to_fast_cache();
};
/**
diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp
index 7ac574fc2c..3b039b76cc 100644
--- a/trunk/src/core/srs_core.hpp
+++ b/trunk/src/core/srs_core.hpp
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
-#define VERSION_REVISION 56
+#define VERSION_REVISION 57
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server"
diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp
index 97c5191ba6..6b1f49cfdb 100644
--- a/trunk/src/core/srs_core_performance.hpp
+++ b/trunk/src/core/srs_core_performance.hpp
@@ -75,14 +75,24 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* @see SrsConfig::get_mw_sleep_ms()
* @remark the mw sleep and msgs to send, maybe:
* mw_sleep msgs iovs
-* 350 24/48 48/84
-* 500 24/48 48/84
-* 800 42/64 84/128
-* 1000 64/85 128/170
-* 1200 65/86 130/172
-* 1500 87/110 174/220
-* 1800 106/128 212/256
-* 2000 134/142 268/284
+* 350 43 86
+* 400 44 88
+* 500 46 92
+* 600 46 92
+* 700 82 164
+* 800 81 162
+* 900 80 160
+* 1000 88 176
+* 1100 91 182
+* 1200 89 178
+* 1300 119 238
+* 1400 120 240
+* 1500 119 238
+* 1600 131 262
+* 1700
+* 1800
+* 1900
+* 2000
*/
// the default config of mw.
#define SRS_PERF_MW_SLEEP 350
diff --git a/trunk/src/rtmp/srs_protocol_msg_array.cpp b/trunk/src/rtmp/srs_protocol_msg_array.cpp
index 4c002bcf84..699603be4e 100644
--- a/trunk/src/rtmp/srs_protocol_msg_array.cpp
+++ b/trunk/src/rtmp/srs_protocol_msg_array.cpp
@@ -32,10 +32,7 @@ SrsMessageArray::SrsMessageArray(int max_msgs)
msgs = new SrsMessage*[max_msgs];
max = max_msgs;
- // initialize
- for (int i = 0; i < max_msgs; i++) {
- msgs[i] = NULL;
- }
+ zero(max_msgs);
}
SrsMessageArray::~SrsMessageArray()
@@ -46,4 +43,23 @@ SrsMessageArray::~SrsMessageArray()
srs_freep(msgs);
}
+void SrsMessageArray::free(int count)
+{
+ // initialize
+ for (int i = 0; i < count; i++) {
+ SrsMessage* msg = msgs[i];
+ srs_freep(msg);
+
+ msgs[i] = NULL;
+ }
+}
+
+void SrsMessageArray::zero(int count)
+{
+ // initialize
+ for (int i = 0; i < count; i++) {
+ msgs[i] = NULL;
+ }
+}
+
diff --git a/trunk/src/rtmp/srs_protocol_msg_array.hpp b/trunk/src/rtmp/srs_protocol_msg_array.hpp
index c7a9cce65a..aff10def7b 100644
--- a/trunk/src/rtmp/srs_protocol_msg_array.hpp
+++ b/trunk/src/rtmp/srs_protocol_msg_array.hpp
@@ -60,6 +60,16 @@ class SrsMessageArray
* free the msgs not sent out(not NULL).
*/
virtual ~SrsMessageArray();
+public:
+ /**
+ * free specified count of messages.
+ */
+ virtual void free(int count);
+private:
+ /**
+ * zero initialize the message array.
+ */
+ virtual void zero(int count);
};
#endif