Skip to content

Commit

Permalink
Forward: add backend config and demo server for dynamic create forwar…
Browse files Browse the repository at this point in the history
…der to other server.(ossrs#1342)
  • Loading branch information
chundonglinlin committed Jan 28, 2022
1 parent 6b7fc6f commit 109fdc7
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 5 deletions.
3 changes: 2 additions & 1 deletion trunk/conf/forward.master.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ srs_log_tank console;
vhost __defaultVhost__ {
forward {
enabled on;
destination 127.0.0.1:19350;
#destination 127.0.0.1:19350;
backend http://127.0.0.1:8085/api/v1/forward;
}
}
168 changes: 168 additions & 0 deletions trunk/research/api-server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,173 @@ def POST(self):
def OPTIONS(self, *args, **kwargs):
enable_crossdomain()

'''
handle the forward requests: dynamic forward url.
'''
class RESTForward(object):
exposed = True

def __init__(self):
self.__forwards = []

self.__forwards.append({
"vhost":"ossrs.net",
"app":"live",
"stream":"livestream",
"url":"push.ossrs.com",
})

self.__forwards.append({
"app":"live",
"stream":"livestream",
"url":"push.ossrs.com",
})

self.__forwards.append({
"app":"live",
"stream":"livestream",
"url":"rtmp://push.ossrs.com/test/teststream?auth_token=123456",
})

def GET(self):
enable_crossdomain()

forwards = {}
return json.dumps(forwards)

'''
for SRS hook: on_forward
on_forward:
when srs reap a dvr file, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_forward",
"server_id": "server_test",
"client_id": 1985,
"ip": "192.168.1.10",
"vhost": "video.test.com",
"app": "live",
"tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a",
"stream": "livestream",
"param":"?token=xxx&salt=yyy"
}
if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success):
0
'''
def POST(self):
enable_crossdomain()

# return the error code in str
code = Error.success

req = cherrypy.request.body.read()
trace("post to forwards, req=%s"%(req))
try:
json_req = json.loads(req)
except Exception, ex:
code = Error.system_parse_json
trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
return json.dumps({"code": int(code), "data": None})

action = json_req["action"]
if action == "on_forward":
return self.__on_forward(json_req)
else:
trace("invalid request action: %s"%(json_req["action"]))
code = Error.request_invalid_action

return json.dumps({"code": int(code), "data": None})

def OPTIONS(self, *args, **kwargs):
enable_crossdomain()

def __on_forward(self, req):
code = Error.success

trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, stream=%s, param=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["stream"], req["param"]
))

# dynamic create forward config
forwards = []
destinations = []

# handle param: ?forward=xxxxx&auth_token=xxxxx
# 1.delete ?
req_param = req["param"].replace('?', '', 1)

# 2.delete 'forward=xxxxx'
new_req_param = ""
params = req_param.split("&")
for param in params:
result = param.split("=")
if result[0].find("forward") != -1:
destinations.append({
"url": result[1],
})
elif len(new_req_param) > 0:
new_req_param = new_req_param + "&" + param
else:
new_req_param = param

# secne: dynamic config
for forward in self.__forwards:
# vhost exist
if hasattr(forward, "vhost"):
if len(forward["vhost"]) > 0 and req["vhost"] != forward["vhost"]:
continue
# app exist
if hasattr(forward, "app"):
if len(forward["app"]) > 0 and req["app"] != forward["app"]:
continue
# app exist
if hasattr(forward, "stream"):
if len(forward["stream"]) > 0 and req["stream"] != forward["stream"]:
continue
# no url
if forward["url"] is None:
continue

# url maybe spell full rtmp address
url = forward["url"]
if url.find("rtmp://") == -1:
# format: xxx:xxx
# maybe you should use destination config
url = "rtmp://%s/%s"%(url, req['app'])
if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1:
url = url + "?vhost=" + req['vhost']
url = url + "/" + req['stream']
if len(new_req_param) > 0:
url = url + "?" + new_req_param

# append
forwards.append({
"url": url,
})

# secne: parse client params, like:
# format1: rtmp://srs-server/live/stream?forward=aliyuncdn.com:1936&token=xxxxxx
# format2: rtmp://srs-server/live/stream?forward=rtmp://cdn.com/myapp/mystream?XXXXXX
for destination in destinations:
url = destination["url"]
if url.find("rtmp://") == -1:
# format: xxx:xxx
# maybe you should use destination config
url = "rtmp://%s/%s"%(url, req['app'])
if len(req['vhost']) > 0 and req['vhost'] != "__defaultVhost__" and url.find(req['vhost']) == -1:
url = url + "?vhost=" + req['vhost']
url = url + "/" + req['stream']
if len(new_req_param) > 0:
url = url + "?" + new_req_param

# append
forwards.append({
"url": url,
})

return json.dumps({"code": int(code), "data": {"forwards": forwards}})

# HTTP RESTful path.
class Root(object):
exposed = True
Expand Down Expand Up @@ -846,6 +1013,7 @@ def __init__(self):
self.chats = RESTChats()
self.servers = RESTServers()
self.snapshots = RESTSnapshots()
self.forward = RESTForward()
def GET(self):
enable_crossdomain();
return json.dumps({"code":Error.success, "urls":{
Expand Down
17 changes: 16 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2795,7 +2795,7 @@ srs_error_t SrsConfig::check_normal_config()
} else if (n == "forward") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "enabled" && m != "destination") {
if (m != "enabled" && m != "destination" && m != "backend") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
Expand Down Expand Up @@ -4605,6 +4605,21 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost)
return conf->get("destination");
}

SrsConfDirective* SrsConfig::get_forward_backend(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}

conf = conf->get("forward");
if (!conf) {
return NULL;
}

return conf->get("backend");
}

SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,8 @@ class SrsConfig
virtual bool get_forward_enabled(SrsConfDirective* vhost);
// Get the forward directive of vhost.
virtual SrsConfDirective* get_forwards(std::string vhost);
// Get the forward directive of backend.
virtual SrsConfDirective* get_forward_backend(std::string vhost);

public:
// Whether the srt sevice enabled
Expand Down
4 changes: 3 additions & 1 deletion trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder()

srs_freep(sh_video);
srs_freep(sh_audio);

srs_freep(req);
}

srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
Expand All @@ -60,7 +62,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)

// it's ok to use the request object,
// SrsLiveSource already copy it and never delete it.
req = r;
req = r->copy();

// the ep(endpoint) to forward to
ep_forward = ep;
Expand Down
94 changes: 94 additions & 0 deletions trunk/src/app/srs_app_http_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,100 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& por
return err;
}

// Request:
// POST /api/v1/forward
// {
// "action": "on_forward",
// "server_id": "vid-k21d7y2",
// "client_id": "9o7g1330",
// "ip": "127.0.0.1",
// "vhost": "__defaultVhost__",
// "app": "live",
// "tcUrl": "rtmp://127.0.0.1:1935/live",
// "stream": "livestream",
// "param": "?forward=rtmp://ossrs.net/live/livestream"
// }
// Response:
// {
// "code": 0,
// "data": {
// "forwards":[{
// "url": "rtmp://ossrs.net:1935/live/livestream?auth_token=xxx"
// },{
// "url": "rtmp://aliyuncdn.com:1935/live/livestream?auth_token=xxx"
// }]
// }
// }
srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::vector<std::string>& rtmp_urls)
{
srs_error_t err = srs_success;

SrsContextId cid = _srs_context->get_id();

SrsStatistic* stat = SrsStatistic::instance();

SrsJsonObject* obj = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, obj);

obj->set("action", SrsJsonAny::str("on_forward"));
obj->set("server_id", SrsJsonAny::str(stat->server_id().c_str()));
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));

std::string data = obj->dumps();
std::string res;
int status_code;

SrsHttpClient http;
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
}

// parse string res to json.
SrsJsonAny* info = SrsJsonAny::loads(res);
if (!info) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "load json from %s", res.c_str());
}
SrsAutoFree(SrsJsonAny, info);

// response error code in string.
if (!info->is_object()) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "response %s", res.c_str());
}

SrsJsonAny* prop = NULL;
// response standard object, format in json: {}
SrsJsonObject* res_info = info->to_object();
if ((prop = res_info->ensure_property_object("data")) == NULL) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse data %s", res.c_str());
}

SrsJsonObject* p = prop->to_object();
if ((prop = p->ensure_property_array("forwards")) == NULL) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse forwards %s", res.c_str());
}

SrsJsonArray* forwards = prop->to_array();
for (int i = 0; i < forwards->count(); i++) {
prop = forwards->at(i);
SrsJsonObject* forward = prop->to_object();
if ((prop = forward->ensure_property_string("url")) != NULL) {
rtmp_urls.push_back(prop->to_str());
}
}

srs_trace("http: on_forward_backend ok, client_id=%s, url=%s, request=%s, response=%s",
cid.c_str(), url.c_str(), data.c_str(), res.c_str());

return err;
}

srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res)
{
srs_error_t err = srs_success;
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/app/srs_app_http_hooks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <srs_core.hpp>

#include <string>
#include <vector>

class SrsHttpUri;
class SrsStSocket;
Expand Down Expand Up @@ -79,6 +80,10 @@ class SrsHttpHooks
static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
// Discover co-workers for origin cluster.
static srs_error_t discover_co_workers(std::string url, std::string& host, int& port);
// The on_forward_backend hook, when publish stream start to forward
// @param url the api server url, to valid the client.
// ignore if empty.
static srs_error_t on_forward_backend(std::string url, SrsRequest* req, std::vector<std::string>& rtmp_urls);
private:
static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res);
};
Expand Down
Loading

0 comments on commit 109fdc7

Please sign in to comment.