From 0b5c45b0c304c91ccce98c1e3ddf71ff5bc7af75 Mon Sep 17 00:00:00 2001 From: Niyati Maheshwari Date: Thu, 16 May 2024 09:23:09 -0700 Subject: [PATCH] [FEATURE] Support putFragmentMetadata from kvssink (#1122) * use custom event for putFragmentMetadata from kvssink * kvssink-fragment-metadata * clwanup * rename to kvs-add-metadata * limit metadata when max count reached * cleanup send_custom_event * max fragment count * cleanup * revert log4cplus cmake * use MAX_FRAGMENT_METADATA_TAGS * abstract event trigger * cleanup: rename * update failure case check * remove logic maintaining the count * cleanup * cleanup-2 * cleanup-3 * address comments * fix new lines and indentation * fix new lines and indentation * fix new lines and indentation * clean up * cleanup * cleanup -3 * remove gstkvssink.h inmport from samples * remove unused unique_ptr * fix the log-line --- .gitignore | 1 + samples/include.h | 5 + samples/kvssink_gstreamer_sample.cpp | 154 ++++++++++++++++++++------- src/gstreamer/gstkvssink.cpp | 24 +++-- 4 files changed, 136 insertions(+), 48 deletions(-) diff --git a/.gitignore b/.gitignore index 0fa1d705..b0795623 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .github/github_deploy_key.enc .idea build +media cmake-build-debug/ cmake-build-release/ doc/ diff --git a/samples/include.h b/samples/include.h index 10dc7f42..98c2ee9c 100644 --- a/samples/include.h +++ b/samples/include.h @@ -1,6 +1,11 @@ #ifndef _KVS_SAMPLE_INCLUDE_H #define _KVS_SAMPLE_INCLUDE_H +#define KVS_ADD_METADATA_G_STRUCT_NAME "kvs-add-metadata" +#define KVS_ADD_METADATA_NAME "name" +#define KVS_ADD_METADATA_VALUE "value" +#define KVS_ADD_METADATA_PERSISTENT "persist" + #define STATUS_KVS_GSTREAMER_SAMPLE_BASE 0x00080000 #define STATUS_KVS_GSTREAMER_SAMPLE_ERROR STATUS_KVS_GSTREAMER_SAMPLE_BASE + 0x00000001 #define STATUS_KVS_GSTREAMER_SAMPLE_INTERRUPTED STATUS_KVS_GSTREAMER_SAMPLE_BASE + 0x00000002 diff --git a/samples/kvssink_gstreamer_sample.cpp b/samples/kvssink_gstreamer_sample.cpp index dbd539de..c8b6fdc6 100644 --- a/samples/kvssink_gstreamer_sample.cpp +++ b/samples/kvssink_gstreamer_sample.cpp @@ -5,9 +5,8 @@ #include #include #include -#include -#include "gstreamer/gstkvssink.h" #include +#include #include "include.h" using namespace std; @@ -50,6 +49,8 @@ typedef struct _CustomData { synthetic_dts(0), last_unpersisted_file_idx(0), stream_status(STATUS_SUCCESS), + put_fragment_metadata_frequency_seconds(2), + fragment_metadata_timer_id(0), base_pts(0), max_frame_pts(0), key_frame_pts(0), @@ -65,6 +66,10 @@ typedef struct _CustomData { bool h264_stream_supported; char *stream_name; mutex file_list_mtx; + int put_fragment_metadata_frequency_seconds; + int fragment_metadata_timer_id; + int metadata_counter = 0; + bool persist_flag = true; // list of files to upload. vector file_list; @@ -102,9 +107,7 @@ typedef struct _CustomData { volatile StreamSource streamSource; string rtsp_url; - - unique_ptr credential; - + uint64_t synthetic_dts; bool use_absolute_fragment_times; @@ -155,6 +158,11 @@ static bool resolution_supported(GstCaps *src_caps, GstCaps *query_caps_raw, Gst /* callback when eos (End of Stream) is posted on bus */ static void eos_cb(GstElement *sink, GstMessage *message, CustomData *data) { + if (data->fragment_metadata_timer_id != 0) { + g_source_remove(data->fragment_metadata_timer_id); + data->fragment_metadata_timer_id = 0; + LOG_TRACE("Removing the put_metadata timer"); + } if (data->streamSource == FILE_SOURCE) { // bookkeeping base_pts. add 1ms to avoid overlap. data->base_pts += +data->max_frame_pts + duration_cast(milliseconds(1)).count(); @@ -175,6 +183,12 @@ static void error_cb(GstBus *bus, GstMessage *msg, CustomData *data) { GError *err; gchar *debug_info; + if (data->fragment_metadata_timer_id != 0) { + g_source_remove(data->fragment_metadata_timer_id); + data->fragment_metadata_timer_id = 0; + LOG_TRACE("Removing the put_metadata timer"); + } + /* Print error details on the screen */ gst_message_parse_error(msg, &err, &debug_info); g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); @@ -215,8 +229,14 @@ void timer(CustomData *data) { /* Function handles sigint signal */ void sigint_handler(int sigint){ - LOG_DEBUG("SIGINT received. Exiting graceully"); + LOG_DEBUG("SIGINT received. Exiting gracefully"); + if (data_global.fragment_metadata_timer_id != 0) { + g_source_remove(data_global.fragment_metadata_timer_id); + data_global.fragment_metadata_timer_id = 0; + LOG_TRACE("Removing the put_metadata timer"); + } + if(data_global.main_loop != NULL){ g_main_loop_quit(data_global.main_loop); } @@ -236,17 +256,17 @@ void determine_credentials(GstElement *kvssink, CustomData *data) { nullptr != (private_key_path = getenv("PRIVATE_KEY_PATH")) && nullptr != (role_alias = getenv("ROLE_ALIAS")) && nullptr != (ca_cert_path = getenv("CA_CERT_PATH"))) { - // set the IoT Credentials if provided in envvar - GstStructure *iot_credentials = gst_structure_new( - "iot-certificate", - "iot-thing-name", G_TYPE_STRING, data->stream_name, - "endpoint", G_TYPE_STRING, iot_credential_endpoint, - "cert-path", G_TYPE_STRING, cert_path, - "key-path", G_TYPE_STRING, private_key_path, - "ca-path", G_TYPE_STRING, ca_cert_path, - "role-aliases", G_TYPE_STRING, role_alias, NULL); - - g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); + // set the IoT Credentials if provided in envvar + GstStructure *iot_credentials = gst_structure_new( + "iot-certificate", + "iot-thing-name", G_TYPE_STRING, data->stream_name, + "endpoint", G_TYPE_STRING, iot_credential_endpoint, + "cert-path", G_TYPE_STRING, cert_path, + "key-path", G_TYPE_STRING, private_key_path, + "ca-path", G_TYPE_STRING, ca_cert_path, + "role-aliases", G_TYPE_STRING, role_alias, NULL); + + g_object_set(G_OBJECT (kvssink), "iot-certificate", iot_credentials, NULL); gst_structure_free(iot_credentials); // kvssink will search for long term credentials in envvar automatically so no need to include here // if no long credentials or IoT credentials provided will look for credential file as last resort @@ -255,7 +275,65 @@ void determine_credentials(GstElement *kvssink, CustomData *data) { } } -int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline) { +/* +This function creates a GstStructure and uses it to trigger the GST_EVENT_CUSTOM_DOWNSTREAM for put_fragment_metadata +*/ +bool put_fragment_metadata(GstElement* element, const std::string name, const std::string value, bool persistent) { + GstStructure *metadata = gst_structure_new_empty(KVS_ADD_METADATA_G_STRUCT_NAME); + gst_structure_set(metadata, KVS_ADD_METADATA_NAME, G_TYPE_STRING, name.c_str(), + KVS_ADD_METADATA_VALUE, G_TYPE_STRING, value.c_str(), + KVS_ADD_METADATA_PERSISTENT, G_TYPE_BOOLEAN, persistent, NULL); + GstEvent* event = gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM, metadata); + LOG_TRACE("Emit the put_fragment_metadata event with structure: " << std::string(gst_structure_to_string (metadata))); + return gst_element_send_event(element, event); +} + +/* +Function to put fragment metadata: name, value, and persist values +This is a sample function. This function alternates between putting persistent and non-persistent metadata +until it puts maximum number (10) of metadata in a fragment. After that, it removes the timer that fires this function + +Customers can write their own put_metadata and trigger it either using the timer or some other logic. This function can contain custom logic to +generate the metadata. To trigger the downstream event that calls the API putKinesisVideoFragmentMetadata, +put_fragment_metadata must be called as shown below. +Example: + + "metadata_name_1", "metadata_value_1", 0 + "metadata_name_2", "metadata_value_2", 1 + "metadata_name_3", "metadata_value_3", 0 + "metadata_name_4", "metadata_value_4", 1 + "metadata_name_1", "metadata_value_5", 0 + "metadata_name_2", "metadata_value_6", 1 + "metadata_name_3", "metadata_value_7", 0 + "metadata_name_4", "metadata_value_8", 1 + +To remove a persistent metadata entry, call the same function with empty value + "metadata_name_2", "", 1 +*/ +static void put_metadata(GstElement* element) { + std::ostringstream metadata_name_stream, metadata_value_stream; + + ++data_global.metadata_counter; + data_global.persist_flag = !data_global.persist_flag; + + metadata_name_stream << "metadata_name_" << data_global.metadata_counter; + metadata_value_stream << "metadata_value_" << data_global.metadata_counter; + + // All even metadata_value_n are persistent, all odd ones are non-persistent. + if (data_global.metadata_counter == 2 * MAX_FRAGMENT_METADATA_COUNT) { + if (data_global.fragment_metadata_timer_id != 0) { + g_source_remove(data_global.fragment_metadata_timer_id); + data_global.fragment_metadata_timer_id = 0; + LOG_WARN("Removing the put_metadata timer as the the max capacity for metadata in a fragment is reached"); + } + } + + if (!put_fragment_metadata(element, metadata_name_stream.str(), metadata_value_stream.str(), data_global.persist_flag)) { + LOG_WARN("Failed to put fragment metadata with name:" << metadata_name_stream.str() << " and value:" << metadata_value_stream.str()); + } +} + +int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline, GstElement *kvssink) { bool vtenc = false, isOnRpi = false; @@ -328,7 +406,7 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem LOG_DEBUG("Streaming with live source and width: " << width << ", height: " << height << ", fps: " << framerate << ", bitrateInKBPS" << bitrateInKBPS); - GstElement *source_filter, *filter, *kvssink, *h264parse, *encoder, *source, *video_convert; + GstElement *source_filter, *filter, *h264parse, *encoder, *source, *video_convert; /* create the elements */ source_filter = gst_element_factory_make("capsfilter", "source_filter"); @@ -341,11 +419,6 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem LOG_ERROR("Failed to create capsfilter (2)"); return 1; } - kvssink = gst_element_factory_make("kvssink", "kvssink"); - if (!kvssink) { - LOG_ERROR("Failed to create kvssink"); - return 1; - } h264parse = gst_element_factory_make("h264parse", "h264parse"); // needed to enforce avc stream format if (!h264parse) { LOG_ERROR("Failed to create h264parse"); @@ -540,7 +613,7 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem return 0; } -int gstreamer_rtsp_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline) { +int gstreamer_rtsp_source_init(int argc, char *argv[], CustomData *data, GstElement *pipeline, GstElement *kvssink) { // process runtime if provided if (argc == 5){ if ((0 == STRCMPI(argv[3], "-runtime")) || @@ -552,10 +625,9 @@ int gstreamer_rtsp_source_init(int argc, char *argv[], CustomData *data, GstElem } } } - GstElement *filter, *kvssink, *depay, *source, *h264parse; + GstElement *filter, *depay, *source, *h264parse; filter = gst_element_factory_make("capsfilter", "filter"); - kvssink = gst_element_factory_make("kvssink", "kvssink"); depay = gst_element_factory_make("rtph264depay", "depay"); source = gst_element_factory_make("rtspsrc", "source"); h264parse = gst_element_factory_make("h264parse", "h264parse"); @@ -603,14 +675,13 @@ int gstreamer_rtsp_source_init(int argc, char *argv[], CustomData *data, GstElem return 0; } -int gstreamer_file_source_init(CustomData *data, GstElement *pipeline) { +int gstreamer_file_source_init(CustomData *data, GstElement *pipeline, GstElement *kvssink) { - GstElement *demux, *kvssink, *filesrc, *h264parse, *filter, *queue; + GstElement *demux, *filesrc, *h264parse, *filter, *queue; string file_suffix; string file_path = data->file_list.at(data->current_file_idx).path; filter = gst_element_factory_make("capsfilter", "filter"); - kvssink = gst_element_factory_make("kvssink", "kvssink"); filesrc = gst_element_factory_make("filesrc", "filesrc"); h264parse = gst_element_factory_make("h264parse", "h264parse"); queue = gst_element_factory_make("queue", "queue"); @@ -680,28 +751,34 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) { /* init GStreamer */ gst_init(&argc, &argv); - GstElement *pipeline; + GstElement *pipeline, *kvssink; int ret; GstStateChangeReturn gst_ret; // Reset first frame pts data->first_pts = GST_CLOCK_TIME_NONE; + kvssink = gst_element_factory_make("kvssink", "kvssink"); + if (!kvssink) { + LOG_ERROR("Failed to create kvssink"); + return 1; + } + switch (data->streamSource) { case LIVE_SOURCE: LOG_INFO("Streaming from live source"); pipeline = gst_pipeline_new("live-kinesis-pipeline"); - ret = gstreamer_live_source_init(argc, argv, data, pipeline); + ret = gstreamer_live_source_init(argc, argv, data, pipeline, kvssink); break; case RTSP_SOURCE: LOG_INFO("Streaming from rtsp source"); pipeline = gst_pipeline_new("rtsp-kinesis-pipeline"); - ret = gstreamer_rtsp_source_init(argc, argv, data, pipeline); + ret = gstreamer_rtsp_source_init(argc, argv, data, pipeline, kvssink); break; case FILE_SOURCE: LOG_INFO("Streaming from file source"); pipeline = gst_pipeline_new("file-kinesis-pipeline"); - ret = gstreamer_file_source_init(data, pipeline); + ret = gstreamer_file_source_init(data, pipeline, kvssink); break; } @@ -715,12 +792,16 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) { g_signal_connect(G_OBJECT(bus), "message::error", (GCallback) error_cb, data); g_signal_connect(G_OBJECT(bus), "message::eos", G_CALLBACK(eos_cb), data); gst_object_unref(bus); + + // Create a GStreamer timer to generate and put fragment metadata tags every 2 seconds + data->fragment_metadata_timer_id = g_timeout_add_seconds(data->put_fragment_metadata_frequency_seconds, reinterpret_cast(put_metadata), kvssink); + /* start streaming */ gst_ret = gst_element_set_state(pipeline, GST_STATE_PLAYING); if (gst_ret == GST_STATE_CHANGE_FAILURE) { g_printerr("Unable to set the pipeline to the playing state.\n"); gst_object_unref(pipeline); - data->stream_status = STATUS_KVS_GSTREAMER_SAMPLE_ERROR; + data->stream_status = STATUS_KVS_GSTREAMER_SAMPLE_ERROR; return 1; } // set timer if valid runtime provided (non-positive values are ignored) @@ -828,7 +909,7 @@ int main(int argc, char *argv[]) { } else if(stream_status == STATUS_KVS_GSTREAMER_SAMPLE_INTERRUPTED){ LOG_ERROR("File upload interrupted. Terminating."); continue_uploading = false; - }else { // non fatal case. retry upload + } else { // non fatal case. retry upload LOG_ERROR("stream error occurred: " << stream_status << ". Terminating."); do_retry = true; } @@ -874,4 +955,3 @@ int main(int argc, char *argv[]) { return 0; } - diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index 34c7d76f..15835c5f 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -121,6 +121,7 @@ GST_DEBUG_CATEGORY_STATIC (gst_kvs_sink_debug); #define KVS_ADD_METADATA_NAME "name" #define KVS_ADD_METADATA_VALUE "value" #define KVS_ADD_METADATA_PERSISTENT "persist" + #define KVS_CLIENT_USER_AGENT_NAME "AWS-SDK-KVS-CPP-CLIENT" #define DEFAULT_AUDIO_TRACK_NAME "audio" @@ -1161,29 +1162,30 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads, gboolean persistent; bool is_persist; - if (!gst_structure_has_name(structure, KVS_ADD_METADATA_G_STRUCT_NAME)) { - goto CleanUp; - } - - LOG_INFO("received kvs-add-metadata event for " << kvssink->stream_name); - if (NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) || + if (!gst_structure_has_name(structure, KVS_ADD_METADATA_G_STRUCT_NAME) || + NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) || NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE) || !gst_structure_get_boolean(structure, KVS_ADD_METADATA_PERSISTENT, &persistent)) { - - LOG_WARN("Event structure contains invalid field: " << std::string(gst_structure_to_string (structure)) << " for " << kvssink->stream_name); + ret = FALSE; + LOG_WARN("Event structure is invalid or it contains an invalid field(s): " << std::string(gst_structure_to_string (structure)) << " for " << kvssink->stream_name); goto CleanUp; } + LOG_TRACE("Received kvs-add-metadata event for " << kvssink->stream_name); metadata_name = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_NAME)); metadata_value = std::string(gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE)); is_persist = persistent; bool result = data->kinesis_video_stream->putFragmentMetadata(metadata_name, metadata_value, is_persist); - if (!result) { - LOG_WARN("Failed to putFragmentMetadata. name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << is_persist << " for " << kvssink->stream_name); - } + gst_event_unref (event); event = NULL; + + if (!result) { + ret = FALSE; + LOG_WARN("Failed to putFragmentMetadata for name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << is_persist << " for " << kvssink->stream_name); + } + break; } case GST_EVENT_EOS: {