Skip to content

Commit

Permalink
Added sendEvents API
Browse files Browse the repository at this point in the history
  • Loading branch information
getroot committed Dec 19, 2024
1 parent 9c3a366 commit b6ec0e5
Show file tree
Hide file tree
Showing 21 changed files with 301 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
#include "../../../../../api_private.h"

#include <base/provider/application.h>
#include <modules/id3v2/id3v2.h>
#include <modules/id3v2/frames/id3v2_frames.h>
#include <modules/data_format/id3v2/id3v2.h>
#include <modules/data_format/id3v2/frames/id3v2_frames.h>
#include <modules/data_format/cue_event/cue_event.h>

namespace api
{
Expand All @@ -25,6 +26,7 @@ namespace api
RegisterPost(R"((startHlsDump))", &StreamActionsController::OnPostStartHLSDump);
RegisterPost(R"((stopHlsDump))", &StreamActionsController::OnPostStopHLSDump);
RegisterPost(R"((sendEvent))", &StreamActionsController::OnPostSendEvent);
RegisterPost(R"((sendEvents))", &StreamActionsController::OnPostSendEvents);

RegisterPost(R"((concludeHlsLive))", &StreamActionsController::OnPostConcludeHlsLive);
}
Expand Down Expand Up @@ -218,6 +220,42 @@ namespace api
return {http::StatusCode::OK};
}

// POST /v1/vhosts/<vhost_name>/apps/<app_name>/streams/<stream_name>:injectHLSEvent
ApiResponse StreamActionsController::OnPostSendEvents(const std::shared_ptr<http::svr::HttpExchange> &client, const Json::Value &request_body,
const std::shared_ptr<mon::HostMetrics> &vhost,
const std::shared_ptr<mon::ApplicationMetrics> &app,
const std::shared_ptr<mon::StreamMetrics> &stream,
const std::vector<std::shared_ptr<mon::StreamMetrics>> &output_streams)
{
if (request_body.isArray() == false || request_body.size() == 0)
{
throw http::HttpError(http::StatusCode::BadRequest, "events(array) is required");
}

MultipleStatus status_codes;
Json::Value response_value(Json::ValueType::arrayValue);
for (const auto &request_event : request_body)
{
try
{
OnPostSendEvent(client, request_event, vhost, app, stream, output_streams);
status_codes.AddStatusCode(http::StatusCode::OK);

Json::Value response;
response["statusCode"] = static_cast<int>(http::StatusCode::OK);
response["message"] = StringFromStatusCode(http::StatusCode::OK);
response_value.append(response);
}
catch (const http::HttpError &error)
{
status_codes.AddStatusCode(error.GetStatusCode());
response_value.append(::serdes::JsonFromError(error));
}
}

return {status_codes, std::move(response_value)};
}

// POST /v1/vhosts/<vhost_name>/apps/<app_name>/streams/<stream_name>:injectHLSEvent
ApiResponse StreamActionsController::OnPostSendEvent(const std::shared_ptr<http::svr::HttpExchange> &client, const Json::Value &request_body,
const std::shared_ptr<mon::HostMetrics> &vhost,
Expand All @@ -226,6 +264,8 @@ namespace api
const std::vector<std::shared_ptr<mon::StreamMetrics>> &output_streams)
{
// Validate request body

// ID3 Event
// {
// "eventFormat": "id3v2",
// "eventType": "video",
Expand All @@ -249,79 +289,48 @@ namespace api
// ]
// }

// Cue Event
// {
// "eventFormat": "cue",
// "events":[
// {
// "cueType": "out", // out | in
// "duration": 60500 // milliseconds, only available when cueType is out
// }
// ]
// }

if (request_body.isMember("eventFormat") == false || request_body["eventFormat"].isString() == false ||
request_body.isMember("events") == false || request_body["events"].isArray() == false)
request_body.isMember("events") == false || request_body["events"].isArray() == false || request_body["events"].size() == 0)
{
throw http::HttpError(http::StatusCode::BadRequest, "eventFormat(string) and events(array) are required");
}

// Now only support ID3v2 format
cmn::BitstreamFormat event_format = cmn::BitstreamFormat::Unknown;
ov::String event_format_string = request_body["eventFormat"].asString().c_str();
if (event_format_string.UpperCaseString() != "ID3V2")
std::shared_ptr<ov::Data> events_data = nullptr;

if (event_format_string.UpperCaseString() == "ID3V2")
{
throw http::HttpError(http::StatusCode::BadRequest, "eventFormat is not supported: [%s]", event_format_string.CStr());
event_format = cmn::BitstreamFormat::ID3v2;
events_data = MakeID3Data(request_body["events"]);
}

cmn::BitstreamFormat event_format = cmn::BitstreamFormat::ID3v2;
bool urgent = false;

if (request_body.isMember("urgent") == true && request_body["urgent"].isBool() == true)
else if (event_format_string.UpperCaseString() == "CUE")
{
urgent = request_body["urgent"].asBool();
event_format = cmn::BitstreamFormat::CUE;
events_data = MakeCueData(request_body["events"]);
}

auto events = request_body["events"];
if (events.size() == 0)
else
{
throw http::HttpError(http::StatusCode::BadRequest, "events is empty");
throw http::HttpError(http::StatusCode::BadRequest, "eventFormat is not supported: [%s]", event_format_string.CStr());
}

// Make ID3v2 tags
auto id3v2_event = std::make_shared<ID3v2>();
id3v2_event->SetVersion(4, 0);
for (const auto &event : events)
if (events_data == nullptr)
{
if (event.isMember("frameType") == false || event["frameType"].isString() == false)
{
throw http::HttpError(http::StatusCode::BadRequest, "frameType is required in events");
}

ov::String frame_type = event["frameType"].asString().c_str();
ov::String info;
ov::String data;

if (event.isMember("info") == true && event["info"].isString() == true)
{
info = event["info"].asString().c_str();
}

if (event.isMember("data") == true && event["data"].isString() == true)
{
data = event["data"].asString().c_str();
}

std::shared_ptr<ID3v2Frame> frame;
if (frame_type.UpperCaseString() == "TXXX")
{
frame = std::make_shared<ID3v2TxxxFrame>(info, data);
}
else if (frame_type.UpperCaseString().Get(0) == 'T')
{
frame = std::make_shared<ID3v2TextFrame>(frame_type, data);
}
else if (frame_type.UpperCaseString() == "PRIV")
{
frame = std::make_shared<ID3v2PrivFrame>(info, data);
}
else
{
throw http::HttpError(http::StatusCode::BadRequest, "frameType is not supported: [%s]", frame_type.CStr());
}

id3v2_event->AddFrame(frame);
throw http::HttpError(http::StatusCode::BadRequest, "Could not make events data");
}

// Event Type
// Event Type (Optional)
cmn::PacketType event_type = cmn::PacketType::EVENT;
if (request_body.isMember("eventType") == true)
{
Expand Down Expand Up @@ -349,7 +358,14 @@ namespace api
throw http::HttpError(http::StatusCode::BadRequest, "eventType is not supported: [%s]", event_type_string.CStr());
}
}


// Urgent (Optional)
bool urgent = false;
if (request_body.isMember("urgent") == true && request_body["urgent"].isBool() == true)
{
urgent = request_body["urgent"].asBool();
}

auto source_stream = GetSourceStream(stream);
if (source_stream == nullptr)
{
Expand All @@ -358,7 +374,7 @@ namespace api
vhost->GetName().CStr(), app->GetVHostAppName().GetAppName().CStr(), stream->GetName().CStr());
}

if (source_stream->SendDataFrame(-1, event_format, event_type, id3v2_event->Serialize(), urgent) == false)
if (source_stream->SendDataFrame(-1, event_format, event_type, events_data, urgent) == false)
{
throw http::HttpError(http::StatusCode::InternalServerError,
"Internal Server Error - Could not inject event: [%s/%s/%s]",
Expand Down Expand Up @@ -473,5 +489,88 @@ namespace api

return application->GetStreamByName(stream->GetName());
}
}
}

std::shared_ptr<ov::Data> StreamActionsController::MakeID3Data(const Json::Value &events)
{
// Make ID3v2 tags
auto id3v2_event = std::make_shared<ID3v2>();
id3v2_event->SetVersion(4, 0);
for (const auto &event : events)
{
if (event.isMember("frameType") == false || event["frameType"].isString() == false)
{
throw http::HttpError(http::StatusCode::BadRequest, "frameType is required in events");
}

ov::String frame_type = event["frameType"].asString().c_str();
ov::String info;
ov::String data;

if (event.isMember("info") == true && event["info"].isString() == true)
{
info = event["info"].asString().c_str();
}

if (event.isMember("data") == true && event["data"].isString() == true)
{
data = event["data"].asString().c_str();
}

std::shared_ptr<ID3v2Frame> frame;
if (frame_type.UpperCaseString() == "TXXX")
{
frame = std::make_shared<ID3v2TxxxFrame>(info, data);
}
else if (frame_type.UpperCaseString().Get(0) == 'T')
{
frame = std::make_shared<ID3v2TextFrame>(frame_type, data);
}
else if (frame_type.UpperCaseString() == "PRIV")
{
frame = std::make_shared<ID3v2PrivFrame>(info, data);
}
else
{
throw http::HttpError(http::StatusCode::BadRequest, "frameType is not supported: [%s]", frame_type.CStr());
}

id3v2_event->AddFrame(frame);
}

return id3v2_event->Serialize();
}

std::shared_ptr<ov::Data> StreamActionsController::MakeCueData(const Json::Value &events)
{
if (events.size() == 0)
{
throw http::HttpError(http::StatusCode::BadRequest, "events must have at least one event");
}

// only first event is used
auto event = events[0];
if (event.isMember("cueType") == false || event["cueType"].isString() == false)
{
throw http::HttpError(http::StatusCode::BadRequest, "cueType is required in events");
}

ov::String cue_type = event["cueType"].asString().c_str();
CueEvent::CueType type = CueEvent::GetCueTypeByName(cue_type);
if (type == CueEvent::CueType::Unknown)
{
throw http::HttpError(http::StatusCode::BadRequest, "cueType is not supported: [%s]", cue_type.CStr());
}

// duration (optional)
uint32_t duration_msec = 0;
if (event.isMember("duration") == true && event["duration"].isUInt() == true)
{
duration_msec = event["duration"].asUInt();
}

auto cue_event = CueEvent::Create(type, duration_msec);

return cue_event->Serialize();
}
} // namespace v1
} // namespace api
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ namespace api
const std::shared_ptr<mon::ApplicationMetrics> &app,
const std::shared_ptr<mon::StreamMetrics> &stream,
const std::vector<std::shared_ptr<mon::StreamMetrics>> &output_streams);

// POST /v1/vhosts/<vhost_name>/apps/<app_name>/streams/<stream_name>:sendEvent
ApiResponse OnPostSendEvents(const std::shared_ptr<http::svr::HttpExchange> &client, const Json::Value &request_body,
const std::shared_ptr<mon::HostMetrics> &vhost,
const std::shared_ptr<mon::ApplicationMetrics> &app,
const std::shared_ptr<mon::StreamMetrics> &stream,
const std::vector<std::shared_ptr<mon::StreamMetrics>> &output_streams);

// POST /v1/vhosts/<vhost_name>/apps/<app_name>/streams/<stream_name>:concludeHlsLive
ApiResponse OnPostConcludeHlsLive(const std::shared_ptr<http::svr::HttpExchange> &client, const Json::Value &request_body,
Expand Down Expand Up @@ -89,6 +96,10 @@ namespace api

return std::static_pointer_cast<T>(stream);
}


std::shared_ptr<ov::Data> MakeID3Data(const Json::Value &events); // ID3v2
std::shared_ptr<ov::Data> MakeCueData(const Json::Value &events); // CUE
};
}
}
4 changes: 3 additions & 1 deletion src/projects/base/mediarouter/media_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ namespace cmn

MP3,

OVEN_EVENT // OvenMediaEngine defined event
OVEN_EVENT, // OvenMediaEngine defined event

CUE
};

enum class PacketType : int8_t
Expand Down
1 change: 1 addition & 0 deletions src/projects/main/AMS.mk
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ LOCAL_STATIC_LIBRARIES := \
rtp_rtcp \
sdp \
id3v2 \
cue_event \
segment_writer \
web_console \
mediarouter \
Expand Down
4 changes: 2 additions & 2 deletions src/projects/modules/containers/bmff/bmff_packager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

#include "bmff_packager.h"
#include "bmff_private.h"
#include <modules/id3v2/id3v2.h>
#include <modules/id3v2/frames/id3v2_frames.h>
#include <modules/data_format/id3v2/id3v2.h>
#include <modules/data_format/id3v2/frames/id3v2_frames.h>
namespace bmff
{
Packager::Packager(const std::shared_ptr<const MediaTrack> &media_track, const std::shared_ptr<const MediaTrack> &data_track, const CencProperty &cenc_property)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#include <modules/bitstream/nalu/nal_stream_converter.h>
#include <modules/bitstream/aac/aac_converter.h>

#include <modules/id3v2/id3v2.h>
#include <modules/id3v2/frames/id3v2_text_frame.h>
#include <modules/data_format/id3v2/id3v2.h>
#include <modules/data_format/id3v2/frames/id3v2_text_frame.h>

namespace bmff
{
Expand Down
5 changes: 5 additions & 0 deletions src/projects/modules/containers/mpegts/mpegts_packetizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ namespace mpegts
return false;
}

if (media_packet->GetMediaType() == cmn::MediaType::Data && media_packet->GetBitstreamFormat() != cmn::BitstreamFormat::ID3v2)
{
return false;
}

auto pid = GetElementaryPid(media_packet->GetTrackId());
auto pes = Pes::Build(pid, track, media_packet);
if (pes == nullptr)
Expand Down
3 changes: 3 additions & 0 deletions src/projects/modules/data_format/AMS.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
LOCAL_PATH := $(call get_local_path)

include $(BUILD_SUB_AMS)
8 changes: 8 additions & 0 deletions src/projects/modules/data_format/cue_event/AMS.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
LOCAL_PATH := $(call get_local_path)
include $(DEFAULT_VARIABLES)

LOCAL_TARGET := cue_event

$(call add_pkg_config,srt)

include $(BUILD_STATIC_LIBRARY)
Loading

0 comments on commit b6ec0e5

Please sign in to comment.