Skip to content

Commit

Permalink
Merge pull request #188 from wowaser/heapBranch
Browse files Browse the repository at this point in the history
rtcp: add thread safety for prticipants_ data structure
  • Loading branch information
jrsnen committed Feb 6, 2023
2 parents 874079e + 97642ff commit 0b788bd
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
1 change: 1 addition & 0 deletions include/uvgrtp/rtcp.hh
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ namespace uvgrtp {
std::mutex rr_mutex_;
std::mutex sdes_mutex_;
std::mutex app_mutex_;
mutable std::mutex participants_mutex_;

std::unique_ptr<std::thread> report_generator_;

Expand Down
49 changes: 39 additions & 10 deletions src/rtcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ void uvgrtp::rtcp::cleanup_participants()
{
UVG_LOG_DEBUG("Removing all participants");

participants_mutex_.lock();
/* free all receiver statistic structs */
for (auto& participant : participants_)
{
free_participant(std::move(participant.second));
}
participants_.clear();
participants_mutex_.unlock();

for (auto& participant : initial_participants_)
{
Expand Down Expand Up @@ -433,6 +435,7 @@ rtp_error_t uvgrtp::rtcp::add_participant(uint32_t ssrc)
return RTP_GENERIC_ERROR;
}

participants_mutex_.lock();
/* RTCP is not in use for this media stream,
* create a "fake" participant that is only used for storing statistics information */
if (initial_participants_.empty())
Expand All @@ -449,6 +452,7 @@ rtp_error_t uvgrtp::rtcp::add_participant(uint32_t ssrc)
participants_[ssrc]->sr_frame = nullptr;
participants_[ssrc]->sdes_frame = nullptr;
participants_[ssrc]->app_frame = nullptr;
participants_mutex_.unlock();

return RTP_OK;
}
Expand Down Expand Up @@ -675,6 +679,7 @@ rtp_error_t uvgrtp::rtcp::install_app_hook(std::function<void(std::unique_ptr<uv

uvgrtp::frame::rtcp_sender_report* uvgrtp::rtcp::get_sender_packet(uint32_t ssrc)
{
std::lock_guard prtcp_lock(participants_mutex_);
if (participants_.find(ssrc) == participants_.end())
{
return nullptr;
Expand All @@ -690,6 +695,7 @@ uvgrtp::frame::rtcp_sender_report* uvgrtp::rtcp::get_sender_packet(uint32_t ssrc

uvgrtp::frame::rtcp_receiver_report* uvgrtp::rtcp::get_receiver_packet(uint32_t ssrc)
{
std::lock_guard prtcp_lock(participants_mutex_);
if (participants_.find(ssrc) == participants_.end())
{
return nullptr;
Expand All @@ -705,6 +711,7 @@ uvgrtp::frame::rtcp_receiver_report* uvgrtp::rtcp::get_receiver_packet(uint32_t

uvgrtp::frame::rtcp_sdes_packet* uvgrtp::rtcp::get_sdes_packet(uint32_t ssrc)
{
std::lock_guard prtcp_lock(participants_mutex_);
if (participants_.find(ssrc) == participants_.end())
{
return nullptr;
Expand All @@ -720,6 +727,7 @@ uvgrtp::frame::rtcp_sdes_packet* uvgrtp::rtcp::get_sdes_packet(uint32_t ssrc)

uvgrtp::frame::rtcp_app_packet* uvgrtp::rtcp::get_app_packet(uint32_t ssrc)
{
std::lock_guard prtcp_lock(participants_mutex_);
if (participants_.find(ssrc) == participants_.end())
{
return nullptr;
Expand All @@ -744,6 +752,7 @@ std::vector<uint32_t> uvgrtp::rtcp::get_participants() const

for (auto& i : participants_)
{
std::lock_guard prtcp_lock(participants_mutex_);
ssrcs.push_back(i.first);
}

Expand Down Expand Up @@ -790,6 +799,7 @@ void uvgrtp::rtcp::zero_stats(uvgrtp::receiver_statistics *stats)

bool uvgrtp::rtcp::is_participant(uint32_t ssrc) const
{
std::lock_guard prtcp_lock(participants_mutex_);
return participants_.find(ssrc) != participants_.end();
}

Expand Down Expand Up @@ -831,7 +841,7 @@ rtp_error_t uvgrtp::rtcp::init_new_participant(const uvgrtp::frame::rtp_frame *f
{
return ret;
}

participants_mutex_.lock();
/* Set the probation to MIN_SEQUENTIAL (2)
*
* What this means is that we must receive at least two packets from SSRC
Expand All @@ -842,6 +852,7 @@ rtp_error_t uvgrtp::rtcp::init_new_participant(const uvgrtp::frame::rtp_frame *f
* Save the timestamp and current NTP timestamp so we can do jitter calculations later on */
participants_[frame->header.ssrc]->stats.initial_rtp = frame->header.timestamp;
participants_[frame->header.ssrc]->stats.initial_ntp = uvgrtp::clock::ntp::now();
participants_mutex_.unlock();

senders_++;

Expand Down Expand Up @@ -869,6 +880,7 @@ rtp_error_t uvgrtp::rtcp::update_sender_stats(size_t pkt_size)

rtp_error_t uvgrtp::rtcp::init_participant_seq(uint32_t ssrc, uint16_t base_seq)
{
std::lock_guard prtcp_lock(participants_mutex_);
if (participants_.find(ssrc) == participants_.end())
{
return RTP_NOT_FOUND;
Expand All @@ -883,6 +895,7 @@ rtp_error_t uvgrtp::rtcp::init_participant_seq(uint32_t ssrc, uint16_t base_seq)

rtp_error_t uvgrtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq)
{
std::unique_lock prtcp_lock(participants_mutex_);
if (participants_.find(ssrc) == participants_.end())
{
UVG_LOG_ERROR("Did not find participant SSRC when updating seq");
Expand All @@ -902,6 +915,7 @@ rtp_error_t uvgrtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq)
participants_[ssrc]->stats.max_seq = seq;
if (!participants_[ssrc]->probation)
{
prtcp_lock.unlock();
uvgrtp::rtcp::init_participant_seq(ssrc, seq);
return RTP_OK;
}
Expand All @@ -926,7 +940,9 @@ rtp_error_t uvgrtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq)
/* Two sequential packets -- assume that the other side
* restarted without telling us so just re-sync
* (i.e., pretend this was the first packet). */
prtcp_lock.unlock();
uvgrtp::rtcp::init_participant_seq(ssrc, seq);
prtcp_lock.lock();
} else {
participants_[ssrc]->stats.bad_seq = (seq + 1) & (RTP_SEQ_MOD - 1);
UVG_LOG_ERROR("Invalid sequence number. Seq jump: %u -> %u", participants_[ssrc]->stats.max_seq, seq);
Expand All @@ -941,6 +957,7 @@ rtp_error_t uvgrtp::rtcp::update_participant_seq(uint32_t ssrc, uint16_t seq)

rtp_error_t uvgrtp::rtcp::reset_rtcp_state(uint32_t ssrc)
{
std::lock_guard prtcp_lock(participants_mutex_);
if (participants_.find(ssrc) != participants_.end())
{
return RTP_SSRC_COLLISION;
Expand All @@ -953,6 +970,7 @@ rtp_error_t uvgrtp::rtcp::reset_rtcp_state(uint32_t ssrc)

bool uvgrtp::rtcp::collision_detected(uint32_t ssrc, const sockaddr_in& src_addr) const
{
std::lock_guard prtcp_lock(participants_mutex_);
if (participants_.find(ssrc) == participants_.end())
{
return false;
Expand All @@ -969,6 +987,7 @@ bool uvgrtp::rtcp::collision_detected(uint32_t ssrc, const sockaddr_in& src_addr

void uvgrtp::rtcp::update_session_statistics(const uvgrtp::frame::rtp_frame *frame)
{
std::lock_guard prtcp_lock(participants_mutex_);
participants_[frame->header.ssrc]->stats.received_rtp_packet = true;

participants_[frame->header.ssrc]->stats.received_pkts += 1;
Expand Down Expand Up @@ -1286,6 +1305,7 @@ rtp_error_t uvgrtp::rtcp::handle_receiver_report_packet(uint8_t* buffer, size_t&
rr_hook_u_(std::unique_ptr<uvgrtp::frame::rtcp_receiver_report>(frame));
}
else {
std::lock_guard prtcp_lock(participants_mutex_);
/* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */
if (participants_[frame->ssrc]->rr_frame)
{
Expand All @@ -1311,6 +1331,7 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* buffer, size_t& r
add_participant(frame->ssrc);
}

participants_mutex_.lock();
participants_[frame->ssrc]->stats.sr_ts = uvgrtp::clock::hrc::now();

frame->sender_info.ntp_msw = ntohl(*(uint32_t*)& buffer[read_ptr]);
Expand All @@ -1323,6 +1344,7 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* buffer, size_t& r
participants_[frame->ssrc]->stats.lsr =
((frame->sender_info.ntp_msw & 0xffff) << 16) |
(frame->sender_info.ntp_lsw >> 16);
participants_mutex_.unlock();

read_reports(buffer, read_ptr, packet_end, frame->header.count, frame->report_blocks);

Expand All @@ -1337,6 +1359,7 @@ rtp_error_t uvgrtp::rtcp::handle_sender_report_packet(uint8_t* buffer, size_t& r
sr_hook_u_(std::unique_ptr<uvgrtp::frame::rtcp_sender_report>(frame));
}
else {
std::lock_guard prtcp_lock(participants_mutex_);
/* Deallocate previous frame from the buffer if it exists, it's going to get overwritten */
if (participants_[frame->ssrc]->sr_frame)
{
Expand Down Expand Up @@ -1404,7 +1427,7 @@ rtp_error_t uvgrtp::rtcp::handle_sdes_packet(uint8_t* packet, size_t& read_ptr,
} else if (sdes_hook_u_) {
sdes_hook_u_(std::unique_ptr<uvgrtp::frame::rtcp_sdes_packet>(frame));
} else {

std::lock_guard prtcp_lock(participants_mutex_);
// Deallocate previous frame from the buffer if it exists, it's going to get overwritten
if (participants_[sender_ssrc]->sdes_frame)
{
Expand Down Expand Up @@ -1445,8 +1468,11 @@ rtp_error_t uvgrtp::rtcp::handle_bye_packet(uint8_t* packet, size_t& read_ptr,
}

UVG_LOG_DEBUG("Destroying participant with BYE");

participants_mutex_.lock();
free_participant(std::move(participants_[ssrc]));
participants_.erase(ssrc);
participants_mutex_.unlock();
}

// TODO: Give BYE packet to user and read optional reason for BYE
Expand Down Expand Up @@ -1493,7 +1519,7 @@ rtp_error_t uvgrtp::rtcp::handle_app_packet(uint8_t* packet, size_t& read_ptr,
} else if (app_hook_u_) {
app_hook_u_(std::unique_ptr<uvgrtp::frame::rtcp_app_packet>(frame));
} else {

std::lock_guard prtcp_lock(participants_mutex_);
if (participants_[frame->ssrc]->app_frame)
{
delete[] participants_[frame->ssrc]->app_frame->payload;
Expand Down Expand Up @@ -1527,6 +1553,7 @@ rtp_error_t uvgrtp::rtcp::send_rtcp_packet_to_participants(uint8_t* frame, uint3

for (auto& p : participants_)
{
std::lock_guard prtcp_lock(participants_mutex_);
if (p.second->socket != nullptr)
{
if ((ret = p.second->socket->sendto(p.second->address, frame, frame_size, 0)) != RTP_OK)
Expand All @@ -1542,7 +1569,6 @@ rtp_error_t uvgrtp::rtcp::send_rtcp_packet_to_participants(uint8_t* frame, uint3
UVG_LOG_ERROR("Tried to send RTCP packet when socket does not exist!");
}
}

delete[] frame;
return ret;
}
Expand Down Expand Up @@ -1609,6 +1635,14 @@ rtp_error_t uvgrtp::rtcp::generate_report()
std::lock_guard<std::mutex> lock(packet_mutex_);
rtcp_pkt_sent_count_++;

bool sr_packet = our_role_ == SENDER && our_stats.sent_rtp_packet;
bool rr_packet = our_role_ == RECEIVER || our_stats.sent_rtp_packet == 0;
bool sdes_packet = true;
uint32_t app_packets_size = size_of_ready_app_packets();
bool bye_packet = !bye_ssrcs_.empty();

// Unique lock unlocks when exiting the scope
std::unique_lock prtcp_lock(participants_mutex_);
uint8_t reports = 0;
for (auto& p : participants_)
{
Expand All @@ -1618,12 +1652,6 @@ rtp_error_t uvgrtp::rtcp::generate_report()
}
}

bool sr_packet = our_role_ == SENDER && our_stats.sent_rtp_packet;
bool rr_packet = our_role_ == RECEIVER || our_stats.sent_rtp_packet == 0;
bool sdes_packet = true;
uint32_t app_packets_size = size_of_ready_app_packets();
bool bye_packet = !bye_ssrcs_.empty();

uint32_t compound_packet_size = size_of_compound_packet(reports, sr_packet, rr_packet, sdes_packet, app_packets_size, bye_packet);

if (compound_packet_size == 0)
Expand Down Expand Up @@ -1722,6 +1750,7 @@ rtp_error_t uvgrtp::rtcp::generate_report()
p.second->stats.received_rtp_packet = false;
}
}
prtcp_lock.unlock(); // End of critical section involving participants_

if (sdes_packet)
{
Expand Down

0 comments on commit 0b788bd

Please sign in to comment.