Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard all accesses to publisher->streams with publisher->streams_mutex #3446

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions src/plugins/janus_videoroom.c
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,7 @@ static void janus_videoroom_publisher_dereference_nodebug(janus_videoroom_publis

static void janus_videoroom_publisher_destroy(janus_videoroom_publisher *p) {
if(p && g_atomic_int_compare_and_exchange(&p->destroyed, 0, 1)) {
janus_mutex_lock(&p->streams_mutex);
/* Forwarders with RTCP support may have an extra reference, stop their source */
janus_mutex_lock(&p->rtp_forwarders_mutex);
if(g_hash_table_size(p->rtp_forwarders) > 0) {
Expand Down Expand Up @@ -2746,6 +2747,7 @@ static void janus_videoroom_publisher_destroy(janus_videoroom_publisher *p) {
}
}
janus_mutex_unlock(&p->rtp_forwarders_mutex);
janus_mutex_unlock(&p->streams_mutex);
janus_refcount_decrease(&p->ref);
}
}
Expand Down Expand Up @@ -4534,6 +4536,7 @@ json_t *janus_videoroom_query_session(janus_plugin_session *handle) {
if(participant->e2ee)
json_object_set_new(info, "e2ee", json_true());
json_t *media = json_array();
janus_mutex_lock(&participant->streams_mutex);
GList *temp = participant->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand Down Expand Up @@ -4576,6 +4579,7 @@ json_t *janus_videoroom_query_session(janus_plugin_session *handle) {
json_array_append_new(media, m);
temp = temp->next;
}
janus_mutex_unlock(&participant->streams_mutex);
json_object_set_new(info, "streams", media);
}
if(participant != NULL)
Expand Down Expand Up @@ -5821,6 +5825,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
goto prepare_response;
}
janus_refcount_increase(&publisher->ref); /* This is just to handle the request for now */
janus_mutex_lock(&publisher->streams_mutex);
janus_mutex_lock(&publisher->rtp_forwarders_mutex);
if(publisher->udp_sock <= 0) {
publisher->udp_sock = socket(!ipv6_disabled ? AF_INET6 : AF_INET, SOCK_DGRAM, IPPROTO_UDP);
Expand Down Expand Up @@ -5850,9 +5855,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
json_t *s = json_array_get(streams, i);
json_t *stream_mid = json_object_get(s, "mid");
const char *mid = json_string_value(stream_mid);
janus_mutex_lock(&publisher->streams_mutex);
ps = g_hash_table_lookup(publisher->streams_bymid, mid);
janus_mutex_unlock(&publisher->streams_mutex);
if(ps == NULL) {
/* FIXME Should we return an error instead? */
JANUS_LOG(LOG_WARN, "No such stream with mid '%s', skipping forwarder...\n", mid);
Expand Down Expand Up @@ -6079,7 +6082,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
guint32 audio_handle = 0;
guint32 video_handle[3] = {0, 0, 0};
guint32 data_handle = 0;
janus_mutex_lock(&publisher->streams_mutex);
if(audio_port > 0) {
/* FIXME Find the audio stream */
GList *temp = publisher->streams;
Expand Down Expand Up @@ -6224,7 +6226,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
}
}
janus_mutex_unlock(&publisher->streams_mutex);
if(audio_handle > 0) {
json_object_set_new(rtp_stream, "audio_stream_id", json_integer(audio_handle));
json_object_set_new(rtp_stream, "audio", json_integer(audio_port));
Expand Down Expand Up @@ -6254,6 +6255,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
json_object_set_new(rtp_stream, "warning", json_string("deprecated_api"));
}
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_mutex_unlock(&videoroom->mutex);
/* These two unrefs are related to the message handling */
janus_refcount_decrease(&publisher->ref);
Expand Down Expand Up @@ -6351,6 +6353,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
goto prepare_response;
}
janus_refcount_increase(&publisher->ref); /* Just to handle the message now */
janus_mutex_lock(&publisher->streams_mutex);
janus_mutex_lock(&publisher->rtp_forwarders_mutex);
/* Find the forwarder by iterating on all the streams */
gboolean found = FALSE;
Expand Down Expand Up @@ -6380,6 +6383,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
temp = temp->next;
}
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_refcount_decrease(&publisher->ref);
janus_mutex_unlock(&videoroom->mutex);
janus_refcount_decrease(&videoroom->ref);
Expand Down Expand Up @@ -6846,6 +6850,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
/* To see if the participant is talking, we need to find the audio stream(s) */
if(g_atomic_int_get(&p->session->started)) {
gboolean found = FALSE, talking = FALSE;
janus_mutex_lock(&p->streams_mutex);
GList *temp = p->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand All @@ -6856,6 +6861,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
temp = temp->next;
}
janus_mutex_unlock(&p->streams_mutex);
if(found)
json_object_set_new(pl, "talking", talking ? json_true() : json_false());
}
Expand Down Expand Up @@ -6908,9 +6914,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
g_hash_table_iter_init(&iter, videoroom->participants);
while (!g_atomic_int_get(&videoroom->destroyed) && g_hash_table_iter_next(&iter, NULL, &value)) {
janus_videoroom_publisher *p = value;
janus_mutex_lock(&p->streams_mutex);
janus_mutex_lock(&p->rtp_forwarders_mutex);
if(g_hash_table_size(p->rtp_forwarders) == 0) {
janus_mutex_unlock(&p->rtp_forwarders_mutex);
janus_mutex_unlock(&p->streams_mutex);
continue;
}
json_t *pl = json_object();
Expand Down Expand Up @@ -6948,6 +6956,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
temp = temp->next;
}
janus_mutex_unlock(&p->rtp_forwarders_mutex);
janus_mutex_unlock(&p->streams_mutex);
json_object_set_new(pl, "forwarders", flist);
json_array_append_new(list, pl);
}
Expand Down Expand Up @@ -7000,9 +7009,12 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
/* Something changed */
if(!participant->recording_active) {
/* Not recording (anymore?) */
janus_mutex_lock(&participant->streams_mutex);
janus_videoroom_recorder_close(participant);
janus_mutex_unlock(&participant->streams_mutex);
} else if(participant->recording_active && g_atomic_int_get(&participant->session->started)) {
/* We've started recording, send a PLI and go on */
janus_mutex_lock(&participant->streams_mutex);
GList *temp = participant->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand All @@ -7013,6 +7025,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
temp = temp->next;
}
janus_mutex_unlock(&participant->streams_mutex);
}
}
janus_mutex_unlock(&participant->rec_mutex);
Expand Down Expand Up @@ -7169,9 +7182,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
g_snprintf(error_cause, 512, "Only local publishers can be remotized");
goto prepare_response;
}
janus_mutex_lock(&publisher->streams_mutex);
janus_mutex_lock(&publisher->rtp_forwarders_mutex);
if(g_hash_table_lookup(publisher->remote_recipients, remote_id) != NULL) {
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_refcount_decrease(&publisher->ref);
janus_refcount_decrease(&videoroom->ref);
JANUS_LOG(LOG_ERR, "Remotization already exists (%s)\n", remote_id);
Expand All @@ -7185,6 +7200,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
if(publisher->udp_sock <= 0 ||
(!ipv6_disabled && setsockopt(publisher->udp_sock, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0)) {
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_refcount_decrease(&publisher->ref);
janus_refcount_decrease(&videoroom->ref);
JANUS_LOG(LOG_ERR, "Could not open UDP socket for RTP stream for publisher (%s), %d (%s)\n",
Expand All @@ -7195,7 +7211,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
}
/* Add a new RTP forwarder for each of the publisher streams */
janus_mutex_lock(&publisher->streams_mutex);
janus_videoroom_publisher_stream *ps = NULL;
janus_rtp_forwarder *f = NULL;
gboolean rtcp_added = FALSE, add_rtcp = FALSE;
Expand Down Expand Up @@ -7253,7 +7268,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
temp = temp->next;
}
janus_mutex_unlock(&publisher->streams_mutex);
/* Keep track of this remotization */
janus_videoroom_remote_recipient *recipient = g_malloc(sizeof(janus_videoroom_remote_recipient));
recipient->remote_id = g_strdup(remote_id);
Expand All @@ -7264,6 +7278,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
g_hash_table_insert(publisher->remote_recipients, g_strdup(remote_id), recipient);
/* Done */
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
response = json_object();
json_object_set_new(response, "videoroom", json_string("success"));
json_object_set_new(response, "room", string_ids ? json_string(publisher->room_id_str) : json_integer(publisher->room_id));
Expand Down Expand Up @@ -7333,10 +7348,12 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
}
janus_refcount_increase(&publisher->ref);
janus_mutex_unlock(&videoroom->mutex);
janus_mutex_lock(&publisher->streams_mutex);
janus_mutex_lock(&publisher->rtp_forwarders_mutex);
/* Check if we know of this remotization */
if(g_hash_table_remove(publisher->remote_recipients, remote_id) == FALSE) {
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
janus_refcount_decrease(&publisher->ref);
janus_refcount_decrease(&videoroom->ref);
JANUS_LOG(LOG_ERR, "No such remotization (%s)\n", remote_id);
Expand Down Expand Up @@ -7368,6 +7385,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
temp = temp->next;
}
janus_mutex_unlock(&publisher->rtp_forwarders_mutex);
janus_mutex_unlock(&publisher->streams_mutex);
/* Done */
response = json_object();
json_object_set_new(response, "videoroom", json_string("success"));
Expand Down Expand Up @@ -7801,9 +7819,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
janus_mutex_init(&ps->subscribers_mutex);
janus_mutex_init(&ps->rtp_forwarders_mutex);
ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy);
janus_mutex_lock(&publisher->streams_mutex);
publisher->streams = g_list_append(publisher->streams, ps);
g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps);
g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps);
janus_mutex_unlock(&publisher->streams_mutex);
mindex++;
}
/* Done, spawn a thread for this remote publisher */
Expand Down Expand Up @@ -8082,11 +8102,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps);
changes = TRUE;
}
janus_mutex_unlock(&publisher->streams_mutex);
if(changes) {
/* Notify all other participants this publisher's media has changed */
janus_videoroom_notify_about_publisher(publisher, TRUE);
}
janus_mutex_unlock(&publisher->streams_mutex);
/* Done */
janus_refcount_decrease(&publisher->ref);
janus_refcount_decrease(&videoroom->ref);
Expand Down Expand Up @@ -8352,9 +8372,10 @@ void janus_videoroom_setup_media(janus_plugin_session *handle) {
if(session->participant_type == janus_videoroom_p_type_publisher) {
janus_videoroom_publisher *participant = janus_videoroom_session_get_publisher(session);
/* Notify all other participants that there's a new boy in town */
janus_mutex_lock(&participant->rec_mutex);
janus_mutex_lock(&participant->streams_mutex);
janus_videoroom_notify_about_publisher(participant, FALSE);
/* Check if we need to start recording */
janus_mutex_lock(&participant->rec_mutex);
if((participant->room && participant->room->record) || participant->recording_active) {
GList *temp = participant->streams;
while(temp) {
Expand All @@ -8364,6 +8385,7 @@ void janus_videoroom_setup_media(janus_plugin_session *handle) {
}
participant->recording_active = TRUE;
}
janus_mutex_unlock(&participant->streams_mutex);
janus_mutex_unlock(&participant->rec_mutex);
janus_refcount_decrease(&participant->ref);
} else if(session->participant_type == janus_videoroom_p_type_subscriber) {
Expand Down Expand Up @@ -9035,7 +9057,9 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) {
janus_mutex_lock(&participant->rec_mutex);
g_free(participant->recording_base);
participant->recording_base = NULL;
janus_mutex_lock(&participant->streams_mutex);
janus_videoroom_recorder_close(participant);
janus_mutex_unlock(&participant->streams_mutex)
janus_mutex_unlock(&participant->rec_mutex);
participant->acodec = JANUS_AUDIOCODEC_NONE;
participant->vcodec = JANUS_VIDEOCODEC_NONE;
Expand Down Expand Up @@ -9632,6 +9656,7 @@ static void *janus_videoroom_handler(void *data) {
/* Add proper info on all the streams */
gboolean audio_added = FALSE, video_added = FALSE, talking_found = FALSE, talking = FALSE;
json_t *media = json_array();
janus_mutex_lock(&p->streams_mutex);
GList *temp = p->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand Down Expand Up @@ -9688,6 +9713,7 @@ static void *janus_videoroom_handler(void *data) {
json_array_append_new(media, info);
temp = temp->next;
}
janus_mutex_unlock(&p->streams_mutex);
json_object_set_new(pl, "streams", media);
if(talking_found)
json_object_set_new(pl, "talking", talking ? json_true() : json_false());
Expand Down Expand Up @@ -10731,9 +10757,12 @@ static void *janus_videoroom_handler(void *data) {
/* Something changed */
if(!participant->recording_active) {
/* Not recording (anymore?) */
janus_mutex_lock(&participant->streams_mutex)
janus_videoroom_recorder_close(participant);
janus_mutex_unlock(&participant->streams_mutex)
} else if(participant->recording_active && g_atomic_int_get(&participant->session->started)) {
/* We've started recording, send a PLI/FIR and go on */
janus_mutex_lock(&participant->streams_mutex);
GList *temp = participant->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand All @@ -10744,6 +10773,7 @@ static void *janus_videoroom_handler(void *data) {
}
temp = temp->next;
}
janus_mutex_unlock(&participant->streams_mutex);
}
}
janus_mutex_unlock(&participant->rec_mutex);
Expand Down Expand Up @@ -10787,10 +10817,10 @@ static void *janus_videoroom_handler(void *data) {
}
}
}
janus_mutex_unlock(&participant->streams_mutex);
/* If at least a description changed, notify everyone else about the publisher details */
if(desc_updated)
janus_videoroom_notify_about_publisher(participant, TRUE);
janus_mutex_unlock(&participant->streams_mutex);
}
/* Done */
event = json_object();
Expand Down Expand Up @@ -11830,6 +11860,7 @@ static void *janus_videoroom_handler(void *data) {
feeds = json_array();
json_object_set_new(root, "streams", feeds);
janus_refcount_increase(&publisher->ref);
janus_mutex_lock(&publisher->streams_mutex);
GList *temp = publisher->streams, *touched_already = NULL;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
Expand Down Expand Up @@ -11857,6 +11888,7 @@ static void *janus_videoroom_handler(void *data) {
}
temp = temp->next;
}
janus_mutex_unlock(&publisher->streams_mutex);
g_list_free(touched_already);
janus_refcount_decrease(&publisher->ref);
/* Take note of the fact this is a legacy request */
Expand Down Expand Up @@ -12883,13 +12915,15 @@ static void *janus_videoroom_handler(void *data) {
/* Is this room recorded, or are we recording this publisher already? */
janus_mutex_lock(&participant->rec_mutex);
if(videoroom->record || participant->recording_active) {
janus_mutex_lock(&participant->streams_mutex);
GList *temp = participant->streams;
while(temp) {
janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data;
janus_videoroom_recorder_create(ps);
temp = temp->next;
}
participant->recording_active = TRUE;
janus_mutex_unlock(&participant->streams_mutex);
}
janus_mutex_unlock(&participant->rec_mutex);
/* Send the answer back to the publisher */
Expand All @@ -12910,7 +12944,9 @@ static void *janus_videoroom_handler(void *data) {
/* If this is an update/renegotiation, notify participants about this */
if(sdp_update && g_atomic_int_get(&session->started)) {
/* Notify all other participants this publisher's media has changed */
janus_mutex_lock(&participant->streams_mutex);
janus_videoroom_notify_about_publisher(participant, TRUE);
janus_mutex_unlock(&participant->streams_mutex);
}
/* Done */
if(res != JANUS_OK) {
Expand Down Expand Up @@ -13223,6 +13259,7 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf,
janus_videoroom_publisher *p = ps->publisher;
if(p == NULL || g_atomic_int_get(&p->destroyed))
return;
janus_mutex_lock(&p->streams_mutex);
janus_mutex_lock(&p->rtp_forwarders_mutex);
if(g_hash_table_size(p->rtp_forwarders) == 0) {
janus_mutex_unlock(&p->rtp_forwarders_mutex);
Expand Down Expand Up @@ -13256,6 +13293,7 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf,
temp = temp->next;
}
janus_mutex_unlock(&p->rtp_forwarders_mutex);
janus_mutex_unlock(&p->streams_mutex);
if(found)
janus_videoroom_reqpli(ps, "RTCP from remotized forwarder");
}
Expand Down Expand Up @@ -13468,7 +13506,9 @@ static void *janus_videoroom_remote_publisher_thread(void *user_data) {
string_ids ? (gpointer)g_strdup(publisher->user_id_str) : (gpointer)janus_uint64_dup(publisher->user_id),
publisher);
/* Let's also notify all other participants that the publisher is here */
janus_mutex_lock(&publisher->streams_mutex);
janus_videoroom_notify_about_publisher(publisher, FALSE);
janus_mutex_unlock(&publisher->streams_mutex);

/* Loop */
int num = 0, i = 0;
Expand Down Expand Up @@ -13652,7 +13692,9 @@ static void *janus_videoroom_remote_publisher_thread(void *user_data) {
janus_mutex_lock(&publisher->rec_mutex);
g_free(publisher->recording_base);
publisher->recording_base = NULL;
janus_mutex_lock(&publisher->streams_mutex)
janus_videoroom_recorder_close(publisher);
janus_mutex_unlock(&publisher->streams_mutex)
janus_mutex_unlock(&publisher->rec_mutex);
publisher->acodec = JANUS_AUDIOCODEC_NONE;
publisher->vcodec = JANUS_VIDEOCODEC_NONE;
Expand Down
Loading