diff --git a/include/uvgrtp/rtcp.hh b/include/uvgrtp/rtcp.hh index 6dcc4080..2f25e493 100644 --- a/include/uvgrtp/rtcp.hh +++ b/include/uvgrtp/rtcp.hh @@ -558,6 +558,7 @@ namespace uvgrtp { std::mutex rr_mutex_; std::mutex sdes_mutex_; std::mutex app_mutex_; + mutable std::mutex participants_mutex_; std::unique_ptr report_generator_; diff --git a/src/rtcp.cc b/src/rtcp.cc index da1c46bd..d29bc2f7 100644 --- a/src/rtcp.cc +++ b/src/rtcp.cc @@ -116,12 +116,14 @@ void uvgrtp::rtcp::cleanup_participants() { UVG_LOG_DEBUG("Removing all participants"); + participants_mutex_.lock(); /* free all receiver statistic structs */ for (auto& participant : participants_) { free_participant(std::move(participant.second)); } participants_.clear(); + participants_mutex_.unlock(); for (auto& participant : initial_participants_) { @@ -433,6 +435,7 @@ rtp_error_t uvgrtp::rtcp::add_participant(uint32_t ssrc) return RTP_GENERIC_ERROR; } + participants_mutex_.lock(); /* RTCP is not in use for this media stream, * create a "fake" participant that is only used for storing statistics information */ if (initial_participants_.empty()) @@ -449,6 +452,7 @@ rtp_error_t uvgrtp::rtcp::add_participant(uint32_t ssrc) participants_[ssrc]->sr_frame = nullptr; participants_[ssrc]->sdes_frame = nullptr; participants_[ssrc]->app_frame = nullptr; + participants_mutex_.unlock(); return RTP_OK; } @@ -675,6 +679,7 @@ rtp_error_t uvgrtp::rtcp::install_app_hook(std::function uvgrtp::rtcp::get_participants() const for (auto& i : participants_) { + std::lock_guard prtcp_lock(participants_mutex_); ssrcs.push_back(i.first); } @@ -790,6 +799,7 @@ void uvgrtp::rtcp::zero_stats(uvgrtp::receiver_statistics *stats) bool uvgrtp::rtcp::is_participant(uint32_t ssrc) const { + std::lock_guard prtcp_lock(participants_mutex_); return participants_.find(ssrc) != participants_.end(); } @@ -831,7 +841,7 @@ rtp_error_t uvgrtp::rtcp::init_new_participant(const uvgrtp::frame::rtp_frame *f { return ret; } - + participants_mutex_.lock(); /* Set the probation to MIN_SEQUENTIAL (2) * * What this means is that we must receive at least two packets from SSRC @@ -842,6 +852,7 @@ rtp_error_t uvgrtp::rtcp::init_new_participant(const uvgrtp::frame::rtp_frame *f * Save the timestamp and current NTP timestamp so we can do jitter calculations later on */ participants_[frame->header.ssrc]->stats.initial_rtp = frame->header.timestamp; participants_[frame->header.ssrc]->stats.initial_ntp = uvgrtp::clock::ntp::now(); + participants_mutex_.unlock(); senders_++; @@ -869,6 +880,7 @@ rtp_error_t uvgrtp::rtcp::update_sender_stats(size_t pkt_size) rtp_error_t uvgrtp::rtcp::init_participant_seq(uint32_t ssrc, uint16_t base_seq) { + std::lock_guard prtcp_lock(participants_mutex_); if (participants_.find(ssrc) == participants_.end()) { return RTP_NOT_FOUND; @@ -883,6 +895,7 @@ rtp_error_t uvgrtp::rtcp::init_participant_seq(uint32_t ssrc, uint16_t base_seq) rtp_error_t uvgrtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq) { + std::unique_lock prtcp_lock(participants_mutex_); if (participants_.find(ssrc) == participants_.end()) { UVG_LOG_ERROR("Did not find participant SSRC when updating seq"); @@ -902,6 +915,7 @@ rtp_error_t uvgrtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq) participants_[ssrc]->stats.max_seq = seq; if (!participants_[ssrc]->probation) { + prtcp_lock.unlock(); uvgrtp::rtcp::init_participant_seq(ssrc, seq); return RTP_OK; } @@ -926,7 +940,9 @@ rtp_error_t uvgrtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq) /* Two sequential packets -- assume that the other side * restarted without telling us so just re-sync * (i.e., pretend this was the first packet). */ + prtcp_lock.unlock(); uvgrtp::rtcp::init_participant_seq(ssrc, seq); + prtcp_lock.lock(); } else { participants_[ssrc]->stats.bad_seq = (seq + 1) & (RTP_SEQ_MOD - 1); UVG_LOG_ERROR("Invalid sequence number. Seq jump: %u -> %u", participants_[ssrc]->stats.max_seq, seq); @@ -941,6 +957,7 @@ rtp_error_t uvgrtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq) rtp_error_t uvgrtp::rtcp::reset_rtcp_state(uint32_t ssrc) { + std::lock_guard prtcp_lock(participants_mutex_); if (participants_.find(ssrc) != participants_.end()) { return RTP_SSRC_COLLISION; @@ -953,6 +970,7 @@ rtp_error_t uvgrtp::rtcp::reset_rtcp_state(uint32_t ssrc) bool uvgrtp::rtcp::collision_detected(uint32_t ssrc, const sockaddr_in& src_addr) const { + std::lock_guard prtcp_lock(participants_mutex_); if (participants_.find(ssrc) == participants_.end()) { return false; @@ -969,6 +987,7 @@ bool uvgrtp::rtcp::collision_detected(uint32_t ssrc, const sockaddr_in& src_addr void uvgrtp::rtcp::update_session_statistics(const uvgrtp::frame::rtp_frame *frame) { + std::lock_guard prtcp_lock(participants_mutex_); participants_[frame->header.ssrc]->stats.received_rtp_packet = true; participants_[frame->header.ssrc]->stats.received_pkts += 1; @@ -1286,6 +1305,7 @@ rtp_error_t uvgrtp::rtcp::handle_receiver_report_packet(uint8_t* buffer, size_t& rr_hook_u_(std::unique_ptr(frame)); } else { + std::lock_guard prtcp_lock(participants_mutex_); /* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */ if (participants_[frame->ssrc]->rr_frame) { @@ -1311,6 +1331,7 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* buffer, size_t& r add_participant(frame->ssrc); } + participants_mutex_.lock(); participants_[frame->ssrc]->stats.sr_ts = uvgrtp::clock::hrc::now(); frame->sender_info.ntp_msw = ntohl(*(uint32_t*)& buffer[read_ptr]); @@ -1323,6 +1344,7 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* buffer, size_t& r participants_[frame->ssrc]->stats.lsr = ((frame->sender_info.ntp_msw & 0xffff) << 16) | (frame->sender_info.ntp_lsw >> 16); + participants_mutex_.unlock(); read_reports(buffer, read_ptr, packet_end, frame->header.count, frame->report_blocks); @@ -1337,6 +1359,7 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* buffer, size_t& r sr_hook_u_(std::unique_ptr(frame)); } else { + std::lock_guard prtcp_lock(participants_mutex_); /* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */ if (participants_[frame->ssrc]->sr_frame) { @@ -1404,7 +1427,7 @@ rtp_error_t uvgrtp::rtcp::handle_sdes_packet(uint8_t* packet, size_t& read_ptr, } else if (sdes_hook_u_) { sdes_hook_u_(std::unique_ptr(frame)); } else { - + std::lock_guard prtcp_lock(participants_mutex_); // Deallocate previous frame from the buffer if it exists, it's going to get overwritten if (participants_[sender_ssrc]->sdes_frame) { @@ -1445,8 +1468,11 @@ rtp_error_t uvgrtp::rtcp::handle_bye_packet(uint8_t* packet, size_t& read_ptr, } UVG_LOG_DEBUG("Destroying participant with BYE"); + + participants_mutex_.lock(); free_participant(std::move(participants_[ssrc])); participants_.erase(ssrc); + participants_mutex_.unlock(); } // TODO: Give BYE packet to user and read optional reason for BYE @@ -1493,7 +1519,7 @@ rtp_error_t uvgrtp::rtcp::handle_app_packet(uint8_t* packet, size_t& read_ptr, } else if (app_hook_u_) { app_hook_u_(std::unique_ptr(frame)); } else { - + std::lock_guard prtcp_lock(participants_mutex_); if (participants_[frame->ssrc]->app_frame) { delete[] participants_[frame->ssrc]->app_frame->payload; @@ -1527,6 +1553,7 @@ rtp_error_t uvgrtp::rtcp::send_rtcp_packet_to_participants(uint8_t* frame, uint3 for (auto& p : participants_) { + std::lock_guard prtcp_lock(participants_mutex_); if (p.second->socket != nullptr) { if ((ret = p.second->socket->sendto(p.second->address, frame, frame_size, 0)) != RTP_OK) @@ -1542,7 +1569,6 @@ rtp_error_t uvgrtp::rtcp::send_rtcp_packet_to_participants(uint8_t* frame, uint3 UVG_LOG_ERROR("Tried to send RTCP packet when socket does not exist!"); } } - delete[] frame; return ret; } @@ -1609,6 +1635,14 @@ rtp_error_t uvgrtp::rtcp::generate_report() std::lock_guard lock(packet_mutex_); rtcp_pkt_sent_count_++; + bool sr_packet = our_role_ == SENDER && our_stats.sent_rtp_packet; + bool rr_packet = our_role_ == RECEIVER || our_stats.sent_rtp_packet == 0; + bool sdes_packet = true; + uint32_t app_packets_size = size_of_ready_app_packets(); + bool bye_packet = !bye_ssrcs_.empty(); + + // Unique lock unlocks when exiting the scope + std::unique_lock prtcp_lock(participants_mutex_); uint8_t reports = 0; for (auto& p : participants_) { @@ -1618,12 +1652,6 @@ rtp_error_t uvgrtp::rtcp::generate_report() } } - bool sr_packet = our_role_ == SENDER && our_stats.sent_rtp_packet; - bool rr_packet = our_role_ == RECEIVER || our_stats.sent_rtp_packet == 0; - bool sdes_packet = true; - uint32_t app_packets_size = size_of_ready_app_packets(); - bool bye_packet = !bye_ssrcs_.empty(); - uint32_t compound_packet_size = size_of_compound_packet(reports, sr_packet, rr_packet, sdes_packet, app_packets_size, bye_packet); if (compound_packet_size == 0) @@ -1722,6 +1750,7 @@ rtp_error_t uvgrtp::rtcp::generate_report() p.second->stats.received_rtp_packet = false; } } + prtcp_lock.unlock(); // End of critical section involving participants_ if (sdes_packet) {