diff --git a/src/projects/modules/containers/mpegts/mpegts_packager.cpp b/src/projects/modules/containers/mpegts/mpegts_packager.cpp index ef41df9bd..5345736fb 100644 --- a/src/projects/modules/containers/mpegts/mpegts_packager.cpp +++ b/src/projects/modules/containers/mpegts/mpegts_packager.cpp @@ -118,6 +118,104 @@ namespace mpegts _psi_packet_data = MergeTsPacketData(psi_packets); } + bool Packager::InsertMarker(uint32_t data_track_id, const Marker &marker) + { + std::lock_guard lock(_markers_guard); + + auto data_track = GetMediaTrack(data_track_id); + if (data_track == nullptr) + { + logte("Data track is not found for track_id %u", data_track_id); + return false; + } + + // mpeg-2 ts time base is 90khz + int64_t timestamp = (static_cast(marker.timestamp) / data_track->GetTimeBase().GetTimescale() * 90000.0); + + Marker converted_marker = marker; + converted_marker.timestamp = timestamp; + + // if there is CUE-IN event in the marker list, remove it + if (marker.tag.UpperCaseString() == "CUEEVENT-IN") + { + // first item is CUE-IN event in the marker list, remove it + if (_markers.empty() == false) + { + auto it = _markers.begin(); + if (it->second.tag.UpperCaseString() == "CUEEVENT-IN") + { + logti("CUE-IN event is already in the marker list, remove it"); + _markers.erase(it); + } + } + else if (_last_removed_marker.tag.UpperCaseString() == "CUEEVENT-IN" || _last_removed_marker.tag.IsEmpty()) + { + // if it was already applied, do not insert + logtw("CUE-IN event needs CUE-OUT event before it, cannot insert CUE-IN event"); + return false; + } + } + else if (marker.tag.UpperCaseString() == "CUEEVENT-OUT") + { + // if there is CUE-OUT event in the marker list, remove it + if (_markers.empty() == false) + { + auto it = _markers.begin(); + auto first_marker = it->second; + if (first_marker.tag.UpperCaseString() == "CUEEVENT-OUT") + { + // Cannot insert CUE-OUT event if there is already CUE-OUT event + logtw("Cannot insert CUE-OUT event if there is already CUE-OUT event in the marker list"); + return false; + } + else if (first_marker.timestamp >= timestamp) + { + // Cannot insert CUE-OUT event before the CUE-IN event + logtw("Cannot insert CUE-OUT event before the CUE-IN event"); + return false; + } + } + } + + _markers.emplace(timestamp, converted_marker); + + return true; + } + + bool Packager::HasMarker() const + { + std::shared_lock lock(_markers_guard); + return _markers.empty() == false; + } + + const Marker Packager::GetFirstMarker() const + { + std::shared_lock lock(_markers_guard); + if (_markers.empty() == true) + { + return Marker(); + } + + return _markers.begin()->second; + } + + bool Packager::RemoveMarker(int64_t timestamp) + { + std::lock_guard lock(_markers_guard); + + auto it = _markers.find(timestamp); + if (it == _markers.end()) + { + return false; + } + + // It means that the last removed marker was applied + _last_removed_marker = it->second; + + _markers.erase(it); + return true; + } + void Packager::OnFrame(const std::shared_ptr &media_packet, const std::vector> &pes_packets) { //logtd("OnFrame track_id %u", media_packet->GetTrackId()); @@ -132,20 +230,37 @@ namespace mpegts return; } - if (track_id == _main_track_id && - sample_buffer->GetCurrentDurationUs() >= _config.target_duration_ms * 1000) - { - if (media_packet->GetMediaType() == cmn::MediaType::Video && media_packet->IsKeyFrame()) - { - sample_buffer->MarkSegmentBoundary(); - } - else if (media_packet->GetMediaType() == cmn::MediaType::Audio) - { - sample_buffer->MarkSegmentBoundary(); - } - } + auto sample = mpegts::Sample(media_packet, MergeTsPacketData(pes_packets), track->GetTimeBase().GetTimescale()); + + if (track_id == _main_track_id) + { + if (_force_make_boundary == false && HasMarker() == true) + { + auto marker = GetFirstMarker(); + if (marker.timestamp >= sample._dts && marker.timestamp < sample._dts + sample._duration) + { + logti("Stream(%s) Track(%u) has a marker at %lld, force to create a new boundary", _config.stream_id_meta.CStr(), track_id, marker.timestamp); - sample_buffer->AddSample(mpegts::Sample(media_packet, MergeTsPacketData(pes_packets), track->GetTimeBase().GetTimescale())); + _force_make_boundary = true; + } + } + + if ((sample_buffer->GetCurrentDurationUs() >= _config.target_duration_ms * 1000) || _force_make_boundary == true) + { + if (media_packet->GetMediaType() == cmn::MediaType::Video && media_packet->IsKeyFrame()) + { + sample_buffer->MarkSegmentBoundary(); + _force_make_boundary = false; + } + else if (media_packet->GetMediaType() == cmn::MediaType::Audio) + { + sample_buffer->MarkSegmentBoundary(); + _force_make_boundary = false; + } + } + } + + sample_buffer->AddSample(sample); CreateSegmentIfReady(); } @@ -238,6 +353,45 @@ namespace mpegts auto main_segment_duration_us = main_sample_buffer->GetDurationUntilSegmentBoundaryUs(); uint64_t total_main_segment_duration_us = main_sample_buffer->GetTotalConsumedDurationUs() + main_segment_duration_us; + + bool found_marker = false; + Marker marker; + if (HasMarker() == true) + { + int64_t main_segment_base_timestamp = main_sample_buffer->GetSample()._dts; + int64_t main_segment_duration = main_segment_duration_us / 1000000 * 90000; + int64_t main_segment_end_timestamp = main_segment_base_timestamp + main_segment_duration; + + while (HasMarker()) + { + marker = GetFirstMarker(); + if (marker.timestamp < main_segment_base_timestamp) + { + logte("Stream(%s) Main Track(%u) has a marker at %lld, but it is before the current segment", _config.stream_id_meta.CStr(), _main_track_id, marker.timestamp); + RemoveMarker(marker.timestamp); + } + else + { + break; + } + } + + // All Marker can be removed so check again + if (HasMarker() == true) + { + marker = GetFirstMarker(); + if (marker.timestamp >= main_segment_base_timestamp && marker.timestamp < main_segment_end_timestamp) + { + logti("Stream(%s) Main Track(%u) has a marker at %lld, force to create a new segment", _config.stream_id_meta.CStr(), _main_track_id, marker.timestamp); + + RemoveMarker(marker.timestamp); + found_marker = true; + force_create = true; + } + + // Else wait for the next segment + } + } if (force_create == false) { @@ -300,6 +454,10 @@ namespace mpegts } auto segment = std::make_shared(GetNextSegmentId(), first_sample._dts, main_segment_duration_us); + if (found_marker == true) + { + segment->SetMarker(marker); + } // Add PSI packets segment->AddPacketData(_psi_packet_data); diff --git a/src/projects/modules/containers/mpegts/mpegts_packager.h b/src/projects/modules/containers/mpegts/mpegts_packager.h index 9b8355fde..935135d4c 100644 --- a/src/projects/modules/containers/mpegts/mpegts_packager.h +++ b/src/projects/modules/containers/mpegts/mpegts_packager.h @@ -17,6 +17,13 @@ namespace mpegts { constexpr size_t SEGMENT_BUFFER_SIZE = 2000000; + struct Marker + { + int64_t timestamp = -1; + ov::String tag; + std::shared_ptr data = nullptr; + }; + class Segment { public: @@ -122,6 +129,21 @@ namespace mpegts return nullptr; } + bool HasMarker() const + { + return _marker.timestamp != -1; + } + + void SetMarker(const Marker &marker) + { + _marker = marker; + } + + const Marker &GetMarker() const + { + return _marker; + } + private: uint64_t _segment_id = 0; int64_t _first_dts = -1; @@ -133,6 +155,8 @@ namespace mpegts bool _is_data_in_memory = false; bool _is_data_in_file = false; + + Marker _marker; }; struct Sample @@ -146,11 +170,13 @@ namespace mpegts { _pts = (static_cast(media_packet->GetPts()) / timescale * 90000.0); _dts = (static_cast(media_packet->GetDts()) / timescale * 90000.0); + _duration = (static_cast(media_packet->GetDuration()) / timescale * 90000.0); } } int64_t _pts = -1; int64_t _dts = -1; + int64_t _duration = -1; std::shared_ptr media_packet = nullptr; std::shared_ptr ts_packet_data = nullptr; @@ -367,6 +393,10 @@ namespace mpegts bool AddSink(const std::shared_ptr &sink); + // When a marker is added, create a segment as soon as possible and send the information to the Sink. + // track_id is data track id, not the main track id + bool InsertMarker(uint32_t data_track_id, const Marker &marker); + //////////////////////////////// // PacketizerSink interface //////////////////////////////// @@ -421,10 +451,17 @@ namespace mpegts ov::String GetDvrStoragePath() const; ov::String GetSegmentFilePath(uint64_t segment_id) const; - + + bool HasMarker() const; + const Marker GetFirstMarker() const; + bool RemoveMarker(int64_t timestamp); + ov::String _packager_id; Config _config; + // make segment boundary as soon as possible + bool _force_make_boundary = false; + // track_id -> SampleBuffer std::map> _sample_buffers; @@ -433,6 +470,9 @@ namespace mpegts std::vector> _psi_packets; std::shared_ptr _psi_packet_data; + std::map _markers; + mutable std::shared_mutex _markers_guard; + uint64_t _last_segment_id = 0; std::map> _segments; @@ -447,5 +487,7 @@ namespace mpegts mutable std::shared_mutex _retained_segments_guard; std::vector> _sinks; + + Marker _last_removed_marker; }; } \ No newline at end of file diff --git a/src/projects/modules/data_format/cue_event/cue_event.cpp b/src/projects/modules/data_format/cue_event/cue_event.cpp index 7a1da3d8b..339caa8ad 100644 --- a/src/projects/modules/data_format/cue_event/cue_event.cpp +++ b/src/projects/modules/data_format/cue_event/cue_event.cpp @@ -22,7 +22,23 @@ std::shared_ptr CueEvent::Parse(const std::shared_ptr &data) ov::ByteStream stream(data); - CueType cue_type = static_cast(stream.Read8()); + if (data->GetLength() != 5) + { + return nullptr; + } + + uint8_t cue_type_value = stream.Read8(); + if (cue_type_value > static_cast(CueType::IN)) + { + return nullptr; + } + + CueType cue_type = static_cast(cue_type_value); + if (cue_type == CueType::Unknown) + { + return nullptr; + } + uint32_t duration_msec = stream.ReadBE32(); return Create(cue_type, duration_msec); @@ -56,4 +72,27 @@ std::shared_ptr CueEvent::Serialize() const stream.WriteBE32(_duration_msec); return stream.GetDataPointer(); +} + +CueEvent::CueType CueEvent::GetCueType() const +{ + return _cue_type; +} + +ov::String CueEvent::GetCueTypeName() const +{ + switch (_cue_type) + { + case CueType::OUT: + return "OUT"; + case CueType::IN: + return "IN"; + default: + return "Unknown"; + } +} + +uint32_t CueEvent::GetDurationMsec() const +{ + return _duration_msec; } \ No newline at end of file diff --git a/src/projects/modules/data_format/cue_event/cue_event.h b/src/projects/modules/data_format/cue_event/cue_event.h index f7e045b7d..193081056 100644 --- a/src/projects/modules/data_format/cue_event/cue_event.h +++ b/src/projects/modules/data_format/cue_event/cue_event.h @@ -37,6 +37,10 @@ class CueEvent std::shared_ptr Serialize() const; + CueType GetCueType() const; + ov::String GetCueTypeName() const; + uint32_t GetDurationMsec() const; + private: CueType _cue_type; uint32_t _duration_msec; diff --git a/src/projects/publishers/hls/hls_media_playlist.cpp b/src/projects/publishers/hls/hls_media_playlist.cpp index 19aa9331e..f24d8429e 100644 --- a/src/projects/publishers/hls/hls_media_playlist.cpp +++ b/src/projects/publishers/hls/hls_media_playlist.cpp @@ -10,6 +10,8 @@ #include "hls_media_playlist.h" #include "hls_private.h" +#include + HlsMediaPlaylist::HlsMediaPlaylist(const ov::String &id, const ov::String &playlist_file_name, const HlsMediaPlaylistConfig &config) : _config(config) , _variant_name(id) @@ -43,6 +45,11 @@ bool HlsMediaPlaylist::OnSegmentCreated(const std::shared_ptr & logtd("HlsMediaPlaylist::OnSegmentCreated - number(%d) url(%s) duration_us(%llu)\n", segment->GetNumber(), segment->GetUrl().CStr(), segment->GetDurationUs()); + if (segment->HasMarker() == true) + { + logti("Marker is found in the segment %d, tag : %s, timestamp : %lld", segment->GetNumber(), segment->GetMarker().tag.CStr(), segment->GetMarker().timestamp); + } + _segments.emplace(segment->GetNumber(), segment); return true; diff --git a/src/projects/publishers/hls/hls_stream.cpp b/src/projects/publishers/hls/hls_stream.cpp index b9aa8ce20..ad2d33cdd 100755 --- a/src/projects/publishers/hls/hls_stream.cpp +++ b/src/projects/publishers/hls/hls_stream.cpp @@ -317,6 +317,48 @@ void HlsStream::SendDataFrame(const std::shared_ptr &media_packet) SendBufferedPackets(); } + if (media_packet->GetBitstreamFormat() == cmn::BitstreamFormat::CUE) + { + mpegts::Marker marker; + marker.timestamp = media_packet->GetDts(); + marker.data = media_packet->GetData()->Clone(); + + // Parse the cue data + auto cue_event = CueEvent::Parse(marker.data); + if (cue_event == nullptr) + { + logte("(%s/%s) Failed to parse the cue event data", GetApplication()->GetVHostAppName().CStr(), GetName().CStr()); + return; + } + + marker.tag = ov::String::FormatString("CueEvent-%s", cue_event->GetCueTypeName().CStr()); + + // Insert marker to all packagers + for (auto &it : _packagers) + { + auto packager = it.second; + auto result = packager->InsertMarker(media_packet->GetTrackId(), marker); + + if (result == true && cue_event->GetCueType() == CueEvent::CueType::OUT) + { + // Make CUE-IN event after the duration + auto duration_msec = cue_event->GetDurationMsec(); + auto data_track = GetTrack(media_packet->GetTrackId()); + auto cue_in_timestamp = media_packet->GetDts() + (duration_msec * data_track->GetTimeBase().GetTimescale() / 1000); + + // Create CUE-IN event + mpegts::Marker cue_in_marker; + cue_in_marker.timestamp = cue_in_timestamp; + cue_in_marker.tag = "CueEvent-IN"; + cue_in_marker.data = CueEvent::Create(CueEvent::CueType::IN, 0)->Serialize(); + + packager->InsertMarker(media_packet->GetTrackId(), cue_in_marker); + } + } + + return; + } + AppendMediaPacket(media_packet); } diff --git a/src/projects/publishers/hls/hls_stream.h b/src/projects/publishers/hls/hls_stream.h index b1b76f90f..49f2906e4 100755 --- a/src/projects/publishers/hls/hls_stream.h +++ b/src/projects/publishers/hls/hls_stream.h @@ -14,6 +14,8 @@ #include #include +#include + #include "hls_master_playlist.h" #include "hls_media_playlist.h"