Skip to content

Commit

Permalink
Use async worker to free the stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 31, 2024
1 parent 76acb86 commit a151b53
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 30 deletions.
110 changes: 80 additions & 30 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ using namespace std;
#include <srs_app_statistic.hpp>
#include <srs_app_recv_thread.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_async_call.hpp>

SrsBufferCache::SrsBufferCache(SrsServer* s, SrsRequest* r)
{
Expand Down Expand Up @@ -987,6 +988,7 @@ bool SrsLiveEntry::is_mp3()
SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr)
{
server = svr;
async_ = new SrsAsyncCallWorker();

mux.hijack(this);
_srs_config->subscribe(this);
Expand All @@ -996,6 +998,9 @@ SrsHttpStreamServer::~SrsHttpStreamServer()
{
mux.unhijack(this);
_srs_config->unsubscribe(this);

async_->stop();
srs_freep(async_);

if (true) {
std::map<std::string, SrsLiveEntry*>::iterator it;
Expand Down Expand Up @@ -1023,6 +1028,10 @@ srs_error_t SrsHttpStreamServer::initialize()
if ((err = initialize_flv_streaming()) != srs_success) {
return srs_error_wrap(err, "http flv stream");
}

if ((err = async_->start()) != srs_success) {
return srs_error_wrap(err, "async start");
}

return err;
}
Expand Down Expand Up @@ -1114,39 +1123,19 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r)
return;
}

// Free all HTTP resources.
SrsUniquePtr<SrsLiveEntry> entry(it->second);
entry->disposing = true;

SrsUniquePtr<SrsLiveStream> stream(entry->stream);
SrsUniquePtr<SrsBufferCache> cache(entry->cache);

// Notify cache and stream to stop.
if (stream->entry) stream->entry->enabled = false;
stream->expire();
cache->stop();

// Wait for cache and stream to stop.
int i = 0;
for (; i < 1024; i++) {
if (!cache->alive() && !stream->alive()) {
break;
}
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
// Set the entry to disposing, which will prevent the stream to be reused.
SrsLiveEntry* entry = it->second;
if (entry->disposing) {
return;
}
entry->disposing = true;

if (cache->alive() || stream->alive()) {
srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
// Use async worker to execute the task, which will destroy the stream.
srs_error_t err = srs_success;
if ((err = async_->execute(new SrsHttpStreamDestroy(&mux, &streamHandlers, sid))) != srs_success) {
srs_warn("http: ignore unmount stream failed, sid=%s, err=%s", sid.c_str(), srs_error_desc(err).c_str());
srs_freep(err);
}

// Remove the entry from handlers.
streamHandlers.erase(it);

// Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and
// stream stopped for it uses it.
mux.unhandle(entry->mount, stream.get());

srs_trace("http: unmount flv stream for sid=%s, i=%d", sid.c_str(), i);
}

srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
Expand Down Expand Up @@ -1296,3 +1285,64 @@ srs_error_t SrsHttpStreamServer::initialize_flv_entry(std::string vhost)
return err;
}

SrsHttpStreamDestroy::SrsHttpStreamDestroy(SrsHttpServeMux* mux, map<std::string, SrsLiveEntry*>* handlers, string sid)
{
mux_ = mux;
sid_ = sid;
streamHandlers_ = handlers;
}

SrsHttpStreamDestroy::~SrsHttpStreamDestroy()
{
}

srs_error_t SrsHttpStreamDestroy::call()
{
srs_error_t err = srs_success;

std::map<std::string, SrsLiveEntry*>::iterator it = streamHandlers_->find(sid_);
if (it == streamHandlers_->end()) {
return err;
}

// Free all HTTP resources.
SrsUniquePtr<SrsLiveEntry> entry(it->second);
entry->disposing = true;

SrsUniquePtr<SrsLiveStream> stream(entry->stream);
SrsUniquePtr<SrsBufferCache> cache(entry->cache);

// Notify cache and stream to stop.
if (stream->entry) stream->entry->enabled = false;
stream->expire();
cache->stop();

// Wait for cache and stream to stop.
int i = 0;
for (; i < 1024; i++) {
if (!cache->alive() && !stream->alive()) {
break;
}
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
}

if (cache->alive() || stream->alive()) {
srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
}

// Remove the entry from handlers.
streamHandlers_->erase(it);

// Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and
// stream stopped for it uses it.
mux_->unhandle(entry->mount, stream.get());

srs_trace("http: unmount flv stream for sid=%s, i=%d", sid_.c_str(), i);
return err;
}

string SrsHttpStreamDestroy::to_string()
{
return "destroy";
}

17 changes: 17 additions & 0 deletions trunk/src/app/srs_app_http_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
#include <srs_core.hpp>
#include <srs_app_security.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_async_call.hpp>

#include <vector>

class SrsAacTransmuxer;
class SrsMp3Transmuxer;
class SrsFlvTransmuxer;
class SrsTsTransmuxer;
class SrsAsyncCallWorker;

// A cache for HTTP Live Streaming encoder, to make android(weixin) happy.
class SrsBufferCache : public ISrsCoroutineHandler
Expand Down Expand Up @@ -245,6 +247,7 @@ class SrsHttpStreamServer : public ISrsReloadHandler
{
private:
SrsServer* server;
SrsAsyncCallWorker* async_;
public:
SrsHttpServeMux mux;
// The http live streaming template, to create streams.
Expand All @@ -268,5 +271,19 @@ class SrsHttpStreamServer : public ISrsReloadHandler
virtual srs_error_t initialize_flv_entry(std::string vhost);
};

class SrsHttpStreamDestroy : public ISrsAsyncCallTask
{
private:
std::string sid_;
std::map<std::string, SrsLiveEntry*>* streamHandlers_;
SrsHttpServeMux* mux_;
public:
SrsHttpStreamDestroy(SrsHttpServeMux* mux, map<std::string, SrsLiveEntry*>* handlers, string sid);
virtual ~SrsHttpStreamDestroy();
public:
virtual srs_error_t call();
virtual std::string to_string();
};

#endif

0 comments on commit a151b53

Please sign in to comment.