Skip to content

Commit

Permalink
SmartPtr: Refine SRT source manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 19, 2024
1 parent d33099e commit 1829a07
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 25 deletions.
9 changes: 9 additions & 0 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ using namespace std;
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
#endif
#ifdef SRS_SRT
#include <srs_app_srt_source.hpp>
#endif

SrsSignalManager* SrsSignalManager::instance = NULL;

Expand Down Expand Up @@ -809,6 +812,12 @@ srs_error_t SrsServer::start(SrsWaitGroup* wg)
return srs_error_wrap(err, "sources");
}

#ifdef SRS_SRT
if ((err = _srs_srt_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "sources");
}
#endif

if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
Expand Down
12 changes: 6 additions & 6 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1867,7 +1867,7 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut
if (source->stream_is_dead()) {
SrsContextId cid = source->source_id();
if (cid.empty()) cid = source->pre_source_id();
srs_trace("cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
srs_trace("Live: cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
pool.erase(it++);
} else {
++it;
Expand Down Expand Up @@ -1954,11 +1954,6 @@ srs_error_t SrsLiveSource::cycle()

bool SrsLiveSource::stream_is_dead()
{
// unknown state?
if (stream_die_at_ == 0) {
return false;
}

// still publishing?
if (!_can_publish || !publish_edge->can_publish()) {
return false;
Expand Down Expand Up @@ -2742,6 +2737,11 @@ void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer)
if (consumers.empty()) {
play_edge->on_all_client_stop();

// If no publishers, the stream is die.
if (_can_publish) {
stream_die_at_ = srs_get_system_time();
}

// For edge server, the stream die when the last player quit, because the edge stream is created by player
// activities, so it should die when all players quit.
if (_srs_config->get_vhost_is_edge(req->vhost)) {
Expand Down
91 changes: 75 additions & 16 deletions trunk/src/app/srs_app_srt_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ using namespace std;
#include <srs_app_statistic.hpp>
#include <srs_app_pithy_print.hpp>

// the time to cleanup source.
#define SRS_SRT_SOURCE_CLEANUP (3 * SRS_UTIME_SECONDS)

SrsSrtPacket::SrsSrtPacket()
{
shared_buffer_ = NULL;
Expand Down Expand Up @@ -95,11 +98,56 @@ int SrsSrtPacket::size()
SrsSrtSourceManager::SrsSrtSourceManager()
{
lock = srs_mutex_new();
timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS);
}

SrsSrtSourceManager::~SrsSrtSourceManager()
{
srs_mutex_destroy(lock);
srs_freep(timer_);
}

srs_error_t SrsSrtSourceManager::initialize()
{
return setup_ticks();
}

srs_error_t SrsSrtSourceManager::setup_ticks()
{
srs_error_t err = srs_success;

if ((err = timer_->tick(1, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}

if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}

return err;
}

srs_error_t SrsSrtSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;

std::map< std::string, SrsSharedPtr<SrsSrtSource> >::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSharedPtr<SrsSrtSource>& source = it->second;

// When source expired, remove it.
// @see https://github.com/ossrs/srs/issues/713
if (source->stream_is_dead()) {
SrsContextId cid = source->source_id();
if (cid.empty()) cid = source->pre_source_id();
srs_trace("SRT: cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
pool.erase(it++);
} else {
++it;
}
}

return err;
}

srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsSrtSource>& pps)
Expand Down Expand Up @@ -137,19 +185,6 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<Srs
return err;
}

void SrsSrtSourceManager::eliminate(SrsRequest* r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);

string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsSrtSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
pool.erase(it);
}
}

SrsSrtSourceManager* _srs_srt_sources = NULL;

SrsSrtConsumer::SrsSrtConsumer(SrsSrtSource* s)
Expand Down Expand Up @@ -873,6 +908,7 @@ SrsSrtSource::SrsSrtSource()
can_publish_ = true;
frame_builder_ = NULL;
bridge_ = NULL;
stream_die_at_ = 0;
}

SrsSrtSource::~SrsSrtSource()
Expand All @@ -899,6 +935,27 @@ srs_error_t SrsSrtSource::initialize(SrsRequest* r)
return err;
}

bool SrsSrtSource::stream_is_dead()
{
// still publishing?
if (!can_publish_) {
return false;
}

// has any consumers?
if (!consumers.empty()) {
return false;
}

// Delay cleanup source.
srs_utime_t now = srs_get_system_time();
if (now < stream_die_at_ + SRS_SRT_SOURCE_CLEANUP) {
return false;
}

return true;
}

srs_error_t SrsSrtSource::on_source_id_changed(SrsContextId id)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -953,6 +1010,8 @@ srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer)
consumer = new SrsSrtConsumer(this);
consumers.push_back(consumer);

stream_die_at_ = 0;

return err;
}

Expand All @@ -976,7 +1035,7 @@ void SrsSrtSource::on_consumer_destroy(SrsSrtConsumer* consumer)

// Destroy and cleanup source when no publishers and consumers.
if (can_publish_ && consumers.empty()) {
_srs_srt_sources->eliminate(req);
stream_die_at_ = srs_get_system_time();
}
}

Expand Down Expand Up @@ -1033,8 +1092,8 @@ void SrsSrtSource::on_unpublish()
}

// Destroy and cleanup source when no publishers and consumers.
if (can_publish_ && consumers.empty()) {
_srs_srt_sources->eliminate(req);
if (consumers.empty()) {
stream_die_at_ = srs_get_system_time();
}
}

Expand Down
17 changes: 14 additions & 3 deletions trunk/src/app/srs_app_srt_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <srs_protocol_st.hpp>
#include <srs_app_stream_bridge.hpp>
#include <srs_core_autofree.hpp>
#include <srs_app_hourglass.hpp>

class SrsSharedPtrMessage;
class SrsRequest;
Expand Down Expand Up @@ -47,21 +48,26 @@ class SrsSrtPacket
int actual_buffer_size_;
};

class SrsSrtSourceManager
class SrsSrtSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map< std::string, SrsSharedPtr<SrsSrtSource> > pool;
SrsHourGlass* timer_;
public:
SrsSrtSourceManager();
virtual ~SrsSrtSourceManager();
public:
virtual srs_error_t initialize();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param pps the matched source, if success never be NULL.
virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsSrtSource>& pps);
// Dispose and destroy the source.
virtual void eliminate(SrsRequest* r);
};

// Global singleton instance.
Expand Down Expand Up @@ -156,6 +162,9 @@ class SrsSrtSource
virtual ~SrsSrtSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
public:
// Whether stream is dead, which is no publisher or player.
virtual bool stream_is_dead();
public:
// The source id changed.
virtual srs_error_t on_source_id_changed(SrsContextId id);
Expand Down Expand Up @@ -190,6 +199,8 @@ class SrsSrtSource
// To delivery packets to clients.
std::vector<SrsSrtConsumer*> consumers;
bool can_publish_;
// The last die time, while die means neither publishers nor players.
srs_utime_t stream_die_at_;
private:
SrsSrtFrameBuilder* frame_builder_;
ISrsStreamBridge* bridge_;
Expand Down

0 comments on commit 1829a07

Please sign in to comment.