From a98cea863dadae4f934c461ef57f665a94698388 Mon Sep 17 00:00:00 2001 From: Tommi Date: Wed, 13 May 2020 15:06:19 +0200 Subject: [PATCH] Remove the PendingTaskSafetyFlag::Pointer type add ScopedTaskSafety. ScopedTaskSafety simplifies usage of PendingTaskSafetyFlag, so this CL also includes ToQueuedTask support for ScopedTaskSafety and test updates. This is following up on feedback in the following CL: https://webrtc-review.googlesource.com/c/src/+/174262 Change-Id: Idd38dfc1914b24a05fdc4ad256b409dcf1795fc0 Bug: none Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174740 Reviewed-by: Karl Wiberg Commit-Queue: Tommi Cr-Commit-Position: refs/heads/master@{#31241} --- .../task_utils/pending_task_safety_flag.cc | 2 +- .../task_utils/pending_task_safety_flag.h | 28 +++++++++- .../pending_task_safety_flag_unittest.cc | 32 ++++++++---- rtc_base/task_utils/to_queued_task.h | 22 ++++++-- .../task_utils/to_queued_task_unittest.cc | 3 +- video/call_stats2.cc | 9 ++-- video/call_stats2.h | 3 +- video/receive_statistics_proxy2.cc | 52 +++++++++---------- video/receive_statistics_proxy2.h | 3 +- video/rtp_streams_synchronizer2.cc | 14 +++-- video/rtp_streams_synchronizer2.h | 3 +- video/video_receive_stream2.cc | 5 +- video/video_receive_stream2.h | 3 +- 13 files changed, 107 insertions(+), 72 deletions(-) diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc index 307d2d594c..4be2131f3f 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.cc +++ b/rtc_base/task_utils/pending_task_safety_flag.cc @@ -15,7 +15,7 @@ namespace webrtc { // static -PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() { +rtc::scoped_refptr PendingTaskSafetyFlag::Create() { return new rtc::RefCountedObject(); } diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h index 1b301c8034..580fb3f912 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.h +++ b/rtc_base/task_utils/pending_task_safety_flag.h @@ -36,12 +36,17 @@ namespace webrtc { // MyMethod(); // })); // +// Or implicitly by letting ToQueuedTask do the checking: +// +// // Running outside of the main thread. +// my_task_queue_->PostTask(ToQueuedTask(pending_task_safety_flag_, +// [this]() { MyMethod(); })); +// // Note that checking the state only works on the construction/destruction // thread of the ReceiveStatisticsProxy instance. class PendingTaskSafetyFlag : public rtc::RefCountInterface { public: - using Pointer = rtc::scoped_refptr; - static Pointer Create(); + static rtc::scoped_refptr Create(); ~PendingTaskSafetyFlag() = default; @@ -56,6 +61,25 @@ class PendingTaskSafetyFlag : public rtc::RefCountInterface { SequenceChecker main_sequence_; }; +// Makes using PendingTaskSafetyFlag very simple. Automatic PTSF creation +// and signalling of destruction when the ScopedTaskSafety instance goes out +// of scope. +// Should be used by the class that wants tasks dropped after destruction. +// Requirements are that the instance be constructed and destructed on +// the same thread as the potentially dropped tasks would be running on. +class ScopedTaskSafety { + public: + ScopedTaskSafety() = default; + ~ScopedTaskSafety() { flag_->SetNotAlive(); } + + // Returns a new reference to the safety flag. + rtc::scoped_refptr flag() const { return flag_; } + + private: + rtc::scoped_refptr flag_ = + PendingTaskSafetyFlag::Create(); +}; + } // namespace webrtc #endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc index 0c1c3c8e52..6df2fe2ffb 100644 --- a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc +++ b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc @@ -29,7 +29,7 @@ using ::testing::Return; } // namespace TEST(PendingTaskSafetyFlagTest, Basic) { - PendingTaskSafetyFlag::Pointer safety_flag; + rtc::scoped_refptr safety_flag; { // Scope for the |owner| instance. class Owner { @@ -37,12 +37,27 @@ TEST(PendingTaskSafetyFlagTest, Basic) { Owner() = default; ~Owner() { flag_->SetNotAlive(); } - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + rtc::scoped_refptr flag_ = + PendingTaskSafetyFlag::Create(); } owner; EXPECT_TRUE(owner.flag_->alive()); safety_flag = owner.flag_; EXPECT_TRUE(safety_flag->alive()); } + // |owner| now out of scope. + EXPECT_FALSE(safety_flag->alive()); +} + +TEST(PendingTaskSafetyFlagTest, BasicScoped) { + rtc::scoped_refptr safety_flag; + { + struct Owner { + ScopedTaskSafety safety; + } owner; + safety_flag = owner.safety.flag(); + EXPECT_TRUE(safety_flag->alive()); + } + // |owner| now out of scope. EXPECT_FALSE(safety_flag->alive()); } @@ -72,7 +87,8 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) { private: TaskQueueBase* const tq_main_; bool stuff_done_ = false; - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + rtc::scoped_refptr flag_{ + PendingTaskSafetyFlag::Create()}; }; std::unique_ptr owner; @@ -106,22 +122,18 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) { } ~Owner() { RTC_DCHECK(tq_main_->IsCurrent()); - flag_->SetNotAlive(); } void DoStuff() { RTC_DCHECK(!tq_main_->IsCurrent()); - tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() { - if (!safe->alive()) - return; - *stuff_done_ = true; - })); + tq_main_->PostTask( + ToQueuedTask(safety_, [this]() { *stuff_done_ = true; })); } private: TaskQueueBase* const tq_main_; bool* const stuff_done_; - PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()}; + ScopedTaskSafety safety_; }; std::unique_ptr owner; diff --git a/rtc_base/task_utils/to_queued_task.h b/rtc_base/task_utils/to_queued_task.h index cc9325ebd6..07ab0ebe26 100644 --- a/rtc_base/task_utils/to_queued_task.h +++ b/rtc_base/task_utils/to_queued_task.h @@ -39,7 +39,7 @@ class ClosureTask : public QueuedTask { template class SafetyClosureTask : public QueuedTask { public: - explicit SafetyClosureTask(PendingTaskSafetyFlag::Pointer safety, + explicit SafetyClosureTask(rtc::scoped_refptr safety, Closure&& closure) : closure_(std::forward(closure)), safety_flag_(std::move(safety)) {} @@ -52,7 +52,7 @@ class SafetyClosureTask : public QueuedTask { } typename std::decay::type closure_; - PendingTaskSafetyFlag::Pointer safety_flag_; + rtc::scoped_refptr safety_flag_; }; // Extends ClosureTask to also allow specifying cleanup code. @@ -81,13 +81,25 @@ std::unique_ptr ToQueuedTask(Closure&& closure) { } template -std::unique_ptr ToQueuedTask(PendingTaskSafetyFlag::Pointer safety, - Closure&& closure) { +std::unique_ptr ToQueuedTask( + rtc::scoped_refptr safety, + Closure&& closure) { return std::make_unique>( std::move(safety), std::forward(closure)); } -template +template +std::unique_ptr ToQueuedTask(const ScopedTaskSafety& safety, + Closure&& closure) { + return ToQueuedTask(safety.flag(), std::forward(closure)); +} + +template ::type>::type, + ScopedTaskSafety>::value>::type* = nullptr> std::unique_ptr ToQueuedTask(Closure&& closure, Cleanup&& cleanup) { return std::make_unique< webrtc_new_closure_impl::ClosureTaskWithCleanup>( diff --git a/rtc_base/task_utils/to_queued_task_unittest.cc b/rtc_base/task_utils/to_queued_task_unittest.cc index e98c81e9ce..261b9e891b 100644 --- a/rtc_base/task_utils/to_queued_task_unittest.cc +++ b/rtc_base/task_utils/to_queued_task_unittest.cc @@ -127,7 +127,8 @@ TEST(ToQueuedTaskTest, AcceptsMoveOnlyCleanup) { } TEST(ToQueuedTaskTest, PendingTaskSafetyFlag) { - PendingTaskSafetyFlag::Pointer flag(PendingTaskSafetyFlag::Create()); + rtc::scoped_refptr flag = + PendingTaskSafetyFlag::Create(); int count = 0; // Create two identical tasks that increment the |count|. diff --git a/video/call_stats2.cc b/video/call_stats2.cc index af0da0f702..ce68127490 100644 --- a/video/call_stats2.cc +++ b/video/call_stats2.cc @@ -76,16 +76,13 @@ CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue) RTC_DCHECK(task_queue_); process_thread_checker_.Detach(); task_queue_->PostDelayedTask( - ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), - kUpdateIntervalMs); + ToQueuedTask(task_safety_, [this]() { RunTimer(); }), kUpdateIntervalMs); } CallStats::~CallStats() { RTC_DCHECK_RUN_ON(&construction_thread_checker_); RTC_DCHECK(observers_.empty()); - task_safety_flag_->SetNotAlive(); - UpdateHistograms(); } @@ -98,7 +95,7 @@ void CallStats::RunTimer() { last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds(); task_queue_->PostDelayedTask( - ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), interval); + ToQueuedTask(task_safety_, [this]() { RunTimer(); }), interval); } void CallStats::UpdateAndReport() { @@ -156,7 +153,7 @@ void CallStats::OnRttUpdate(int64_t rtt) { RTC_DCHECK_RUN_ON(&process_thread_checker_); int64_t now_ms = clock_->TimeInMilliseconds(); - task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this, rtt, now_ms]() { + task_queue_->PostTask(ToQueuedTask(task_safety_, [this, rtt, now_ms]() { RTC_DCHECK_RUN_ON(&construction_thread_checker_); reports_.push_back(RttTime(rtt, now_ms)); if (time_of_first_rtt_ms_ == -1) diff --git a/video/call_stats2.h b/video/call_stats2.h index f06d33daf7..49d2db7d31 100644 --- a/video/call_stats2.h +++ b/video/call_stats2.h @@ -139,8 +139,7 @@ class CallStats { TaskQueueBase* const task_queue_; // Used to signal destruction to potentially pending tasks. - PendingTaskSafetyFlag::Pointer task_safety_flag_ = - PendingTaskSafetyFlag::Create(); + ScopedTaskSafety task_safety_; RTC_DISALLOW_COPY_AND_ASSIGN(CallStats); }; diff --git a/video/receive_statistics_proxy2.cc b/video/receive_statistics_proxy2.cc index b818eae018..79684f21e6 100644 --- a/video/receive_statistics_proxy2.cc +++ b/video/receive_statistics_proxy2.cc @@ -129,7 +129,6 @@ ReceiveStatisticsProxy::ReceiveStatisticsProxy( ReceiveStatisticsProxy::~ReceiveStatisticsProxy() { RTC_DCHECK_RUN_ON(&main_thread_); - task_safety_flag_->SetNotAlive(); } void ReceiveStatisticsProxy::UpdateHistograms( @@ -689,18 +688,17 @@ VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const { void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) { RTC_DCHECK_RUN_ON(&decode_queue_); - worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [payload_type, this]() { - RTC_DCHECK_RUN_ON(&main_thread_); - stats_.current_payload_type = payload_type; - })); + worker_thread_->PostTask(ToQueuedTask(task_safety_, [payload_type, this]() { + RTC_DCHECK_RUN_ON(&main_thread_); + stats_.current_payload_type = payload_type; + })); } void ReceiveStatisticsProxy::OnDecoderImplementationName( const char* implementation_name) { RTC_DCHECK_RUN_ON(&decode_queue_); worker_thread_->PostTask(ToQueuedTask( - task_safety_flag_, [name = std::string(implementation_name), this]() { + task_safety_, [name = std::string(implementation_name), this]() { RTC_DCHECK_RUN_ON(&main_thread_); stats_.decoder_implementation_name = name; })); @@ -715,7 +713,7 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated( int render_delay_ms) { RTC_DCHECK_RUN_ON(&decode_queue_); worker_thread_->PostTask(ToQueuedTask( - task_safety_flag_, + task_safety_, [max_decode_ms, current_delay_ms, target_delay_ms, jitter_buffer_ms, min_playout_delay_ms, render_delay_ms, this]() { RTC_DCHECK_RUN_ON(&main_thread_); @@ -742,7 +740,7 @@ void ReceiveStatisticsProxy::OnUniqueFramesCounted(int num_unique_frames) { void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated( const TimingFrameInfo& info) { RTC_DCHECK_RUN_ON(&decode_queue_); - worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [info, this]() { + worker_thread_->PostTask(ToQueuedTask(task_safety_, [info, this]() { RTC_DCHECK_RUN_ON(&main_thread_); if (info.flags != VideoSendTiming::kInvalid) { int64_t now_ms = clock_->TimeInMilliseconds(); @@ -777,11 +775,11 @@ void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated( // [main] worker thread. // So until the sender implementation has been updated, we work around this // here by posting the update to the expected thread. We make a by value - // copy of the |task_safety_flag_| to handle the case if the queued task + // copy of the |task_safety_| to handle the case if the queued task // runs after the |ReceiveStatisticsProxy| has been deleted. In such a // case the packet_counter update won't be recorded. worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [ssrc, packet_counter, this]() { + ToQueuedTask(task_safety_, [ssrc, packet_counter, this]() { RtcpPacketTypesCounterUpdated(ssrc, packet_counter); })); return; @@ -810,7 +808,7 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame, // "com.apple.coremedia.decompressionsession.clientcallback" VideoFrameMetaData meta(frame, clock_->CurrentTime()); worker_thread_->PostTask(ToQueuedTask( - task_safety_flag_, [meta, qp, decode_time_ms, content_type, this]() { + task_safety_, [meta, qp, decode_time_ms, content_type, this]() { OnDecodedFrame(meta, qp, decode_time_ms, content_type); })); } @@ -936,8 +934,8 @@ void ReceiveStatisticsProxy::OnSyncOffsetUpdated(int64_t video_playout_ntp_ms, RTC_DCHECK_RUN_ON(&incoming_render_queue_); int64_t now_ms = clock_->TimeInMilliseconds(); worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [video_playout_ntp_ms, sync_offset_ms, - estimated_freq_khz, now_ms, this]() { + ToQueuedTask(task_safety_, [video_playout_ntp_ms, sync_offset_ms, + estimated_freq_khz, now_ms, this]() { RTC_DCHECK_RUN_ON(&main_thread_); sync_offset_counter_.Add(std::abs(sync_offset_ms)); stats_.sync_offset_ms = sync_offset_ms; @@ -990,24 +988,22 @@ void ReceiveStatisticsProxy::OnCompleteFrame(bool is_keyframe, void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) { RTC_DCHECK_RUN_ON(&decode_queue_); - worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [frames_dropped, this]() { - RTC_DCHECK_RUN_ON(&main_thread_); - stats_.frames_dropped += frames_dropped; - })); + worker_thread_->PostTask(ToQueuedTask(task_safety_, [frames_dropped, this]() { + RTC_DCHECK_RUN_ON(&main_thread_); + stats_.frames_dropped += frames_dropped; + })); } void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) { RTC_DCHECK_RUN_ON(&decode_queue_); - worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [codec_type, qp, this]() { - RTC_DCHECK_RUN_ON(&main_thread_); - last_codec_type_ = codec_type; - if (last_codec_type_ == kVideoCodecVP8 && qp != -1) { - qp_counters_.vp8.Add(qp); - qp_sample_.Add(qp); - } - })); + worker_thread_->PostTask(ToQueuedTask(task_safety_, [codec_type, qp, this]() { + RTC_DCHECK_RUN_ON(&main_thread_); + last_codec_type_ = codec_type; + if (last_codec_type_ == kVideoCodecVP8 && qp != -1) { + qp_counters_.vp8.Add(qp); + qp_sample_.Add(qp); + } + })); } void ReceiveStatisticsProxy::OnStreamInactive() { diff --git a/video/receive_statistics_proxy2.h b/video/receive_statistics_proxy2.h index d6f6f1cc21..1357c407ad 100644 --- a/video/receive_statistics_proxy2.h +++ b/video/receive_statistics_proxy2.h @@ -211,8 +211,7 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback, // methods are invoked on such as GetStats(). TaskQueueBase* const worker_thread_; - PendingTaskSafetyFlag::Pointer task_safety_flag_ = - PendingTaskSafetyFlag::Create(); + ScopedTaskSafety task_safety_; SequenceChecker decode_queue_; rtc::ThreadChecker main_thread_; diff --git a/video/rtp_streams_synchronizer2.cc b/video/rtp_streams_synchronizer2.cc index 116cf2879b..7e3bed1467 100644 --- a/video/rtp_streams_synchronizer2.cc +++ b/video/rtp_streams_synchronizer2.cc @@ -47,7 +47,6 @@ RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue, RtpStreamsSynchronizer::~RtpStreamsSynchronizer() { RTC_DCHECK_RUN_ON(&main_checker_); - task_safety_flag_->SetNotAlive(); } void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) { @@ -85,13 +84,12 @@ void RtpStreamsSynchronizer::QueueTimer() { } RTC_DCHECK_LE(delay, kSyncIntervalMs); - task_queue_->PostDelayedTask(ToQueuedTask([this, safety = task_safety_flag_] { - if (!safety->alive()) - return; - RTC_DCHECK_RUN_ON(&main_checker_); - timer_running_ = false; - UpdateDelay(); - }), + task_queue_->PostDelayedTask(ToQueuedTask(task_safety_, + [this] { + RTC_DCHECK_RUN_ON(&main_checker_); + timer_running_ = false; + UpdateDelay(); + }), delay); } diff --git a/video/rtp_streams_synchronizer2.h b/video/rtp_streams_synchronizer2.h index 353434e6a9..83dd0fb6f2 100644 --- a/video/rtp_streams_synchronizer2.h +++ b/video/rtp_streams_synchronizer2.h @@ -70,8 +70,7 @@ class RtpStreamsSynchronizer { bool timer_running_ RTC_GUARDED_BY(main_checker_) = false; // Used to signal destruction to potentially pending tasks. - PendingTaskSafetyFlag::Pointer task_safety_flag_ = - PendingTaskSafetyFlag::Create(); + ScopedTaskSafety task_safety_; }; } // namespace internal diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc index 510c2602c4..b1b482da29 100644 --- a/video/video_receive_stream2.cc +++ b/video/video_receive_stream2.cc @@ -269,7 +269,6 @@ VideoReceiveStream2::~VideoReceiveStream2() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); RTC_LOG(LS_INFO) << "~VideoReceiveStream2: " << config_.ToString(); Stop(); - task_safety_flag_->SetNotAlive(); } void VideoReceiveStream2::SignalNetworkState(NetworkState state) { @@ -491,7 +490,7 @@ void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) { VideoFrameMetaData frame_meta(video_frame, clock_->CurrentTime()); worker_thread_->PostTask( - ToQueuedTask(task_safety_flag_, [frame_meta, this]() { + ToQueuedTask(task_safety_, [frame_meta, this]() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); int64_t video_playout_ntp_ms; int64_t sync_offset_ms; @@ -703,7 +702,7 @@ void VideoReceiveStream2::HandleFrameBufferTimeout() { // check if we have received a packet within the last 5 seconds. bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000; if (!stream_is_active) { - worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [this]() { + worker_thread_->PostTask(ToQueuedTask(task_safety_, [this]() { RTC_DCHECK_RUN_ON(&worker_sequence_checker_); stats_proxy_.OnStreamInactive(); })); diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h index bbed08a7a6..f8cd65dc9d 100644 --- a/video/video_receive_stream2.h +++ b/video/video_receive_stream2.h @@ -255,8 +255,7 @@ class VideoReceiveStream2 : public webrtc::VideoReceiveStream, rtc::TaskQueue decode_queue_; // Used to signal destruction to potentially pending tasks. - PendingTaskSafetyFlag::Pointer task_safety_flag_ = - PendingTaskSafetyFlag::Create(); + ScopedTaskSafety task_safety_; }; } // namespace internal } // namespace webrtc