Skip to content

Commit

Permalink
Remove the PendingTaskSafetyFlag::Pointer type add ScopedTaskSafety.
Browse files Browse the repository at this point in the history
ScopedTaskSafety simplifies usage of PendingTaskSafetyFlag,
so this CL also includes ToQueuedTask support for ScopedTaskSafety
and test updates.

This is following up on feedback in the following CL:
https://webrtc-review.googlesource.com/c/src/+/174262

Change-Id: Idd38dfc1914b24a05fdc4ad256b409dcf1795fc0
Bug: none
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174740
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31241}
  • Loading branch information
Tommi authored and Commit Bot committed May 13, 2020
1 parent ff88a64 commit a98cea8
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 72 deletions.
2 changes: 1 addition & 1 deletion rtc_base/task_utils/pending_task_safety_flag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
namespace webrtc {

// static
PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() {
rtc::scoped_refptr<PendingTaskSafetyFlag> PendingTaskSafetyFlag::Create() {
return new rtc::RefCountedObject<PendingTaskSafetyFlag>();
}

Expand Down
28 changes: 26 additions & 2 deletions rtc_base/task_utils/pending_task_safety_flag.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ namespace webrtc {
// MyMethod();
// }));
//
// Or implicitly by letting ToQueuedTask do the checking:
//
// // Running outside of the main thread.
// my_task_queue_->PostTask(ToQueuedTask(pending_task_safety_flag_,
// [this]() { MyMethod(); }));
//
// Note that checking the state only works on the construction/destruction
// thread of the ReceiveStatisticsProxy instance.
class PendingTaskSafetyFlag : public rtc::RefCountInterface {
public:
using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>;
static Pointer Create();
static rtc::scoped_refptr<PendingTaskSafetyFlag> Create();

~PendingTaskSafetyFlag() = default;

Expand All @@ -56,6 +61,25 @@ class PendingTaskSafetyFlag : public rtc::RefCountInterface {
SequenceChecker main_sequence_;
};

// Makes using PendingTaskSafetyFlag very simple. Automatic PTSF creation
// and signalling of destruction when the ScopedTaskSafety instance goes out
// of scope.
// Should be used by the class that wants tasks dropped after destruction.
// Requirements are that the instance be constructed and destructed on
// the same thread as the potentially dropped tasks would be running on.
class ScopedTaskSafety {
public:
ScopedTaskSafety() = default;
~ScopedTaskSafety() { flag_->SetNotAlive(); }

// Returns a new reference to the safety flag.
rtc::scoped_refptr<PendingTaskSafetyFlag> flag() const { return flag_; }

private:
rtc::scoped_refptr<PendingTaskSafetyFlag> flag_ =
PendingTaskSafetyFlag::Create();
};

} // namespace webrtc

#endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
32 changes: 22 additions & 10 deletions rtc_base/task_utils/pending_task_safety_flag_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,35 @@ using ::testing::Return;
} // namespace

TEST(PendingTaskSafetyFlagTest, Basic) {
PendingTaskSafetyFlag::Pointer safety_flag;
rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag;
{
// Scope for the |owner| instance.
class Owner {
public:
Owner() = default;
~Owner() { flag_->SetNotAlive(); }

PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
rtc::scoped_refptr<PendingTaskSafetyFlag> flag_ =
PendingTaskSafetyFlag::Create();
} owner;
EXPECT_TRUE(owner.flag_->alive());
safety_flag = owner.flag_;
EXPECT_TRUE(safety_flag->alive());
}
// |owner| now out of scope.
EXPECT_FALSE(safety_flag->alive());
}

TEST(PendingTaskSafetyFlagTest, BasicScoped) {
rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag;
{
struct Owner {
ScopedTaskSafety safety;
} owner;
safety_flag = owner.safety.flag();
EXPECT_TRUE(safety_flag->alive());
}
// |owner| now out of scope.
EXPECT_FALSE(safety_flag->alive());
}

Expand Down Expand Up @@ -72,7 +87,8 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) {
private:
TaskQueueBase* const tq_main_;
bool stuff_done_ = false;
PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
rtc::scoped_refptr<PendingTaskSafetyFlag> flag_{
PendingTaskSafetyFlag::Create()};
};

std::unique_ptr<Owner> owner;
Expand Down Expand Up @@ -106,22 +122,18 @@ TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) {
}
~Owner() {
RTC_DCHECK(tq_main_->IsCurrent());
flag_->SetNotAlive();
}

void DoStuff() {
RTC_DCHECK(!tq_main_->IsCurrent());
tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
if (!safe->alive())
return;
*stuff_done_ = true;
}));
tq_main_->PostTask(
ToQueuedTask(safety_, [this]() { *stuff_done_ = true; }));
}

private:
TaskQueueBase* const tq_main_;
bool* const stuff_done_;
PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
ScopedTaskSafety safety_;
};

std::unique_ptr<Owner> owner;
Expand Down
22 changes: 17 additions & 5 deletions rtc_base/task_utils/to_queued_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ClosureTask : public QueuedTask {
template <typename Closure>
class SafetyClosureTask : public QueuedTask {
public:
explicit SafetyClosureTask(PendingTaskSafetyFlag::Pointer safety,
explicit SafetyClosureTask(rtc::scoped_refptr<PendingTaskSafetyFlag> safety,
Closure&& closure)
: closure_(std::forward<Closure>(closure)),
safety_flag_(std::move(safety)) {}
Expand All @@ -52,7 +52,7 @@ class SafetyClosureTask : public QueuedTask {
}

typename std::decay<Closure>::type closure_;
PendingTaskSafetyFlag::Pointer safety_flag_;
rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag_;
};

// Extends ClosureTask to also allow specifying cleanup code.
Expand Down Expand Up @@ -81,13 +81,25 @@ std::unique_ptr<QueuedTask> ToQueuedTask(Closure&& closure) {
}

template <typename Closure>
std::unique_ptr<QueuedTask> ToQueuedTask(PendingTaskSafetyFlag::Pointer safety,
Closure&& closure) {
std::unique_ptr<QueuedTask> ToQueuedTask(
rtc::scoped_refptr<PendingTaskSafetyFlag> safety,
Closure&& closure) {
return std::make_unique<webrtc_new_closure_impl::SafetyClosureTask<Closure>>(
std::move(safety), std::forward<Closure>(closure));
}

template <typename Closure, typename Cleanup>
template <typename Closure>
std::unique_ptr<QueuedTask> ToQueuedTask(const ScopedTaskSafety& safety,
Closure&& closure) {
return ToQueuedTask(safety.flag(), std::forward<Closure>(closure));
}

template <typename Closure,
typename Cleanup,
typename std::enable_if<!std::is_same<
typename std::remove_const<
typename std::remove_reference<Closure>::type>::type,
ScopedTaskSafety>::value>::type* = nullptr>
std::unique_ptr<QueuedTask> ToQueuedTask(Closure&& closure, Cleanup&& cleanup) {
return std::make_unique<
webrtc_new_closure_impl::ClosureTaskWithCleanup<Closure, Cleanup>>(
Expand Down
3 changes: 2 additions & 1 deletion rtc_base/task_utils/to_queued_task_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ TEST(ToQueuedTaskTest, AcceptsMoveOnlyCleanup) {
}

TEST(ToQueuedTaskTest, PendingTaskSafetyFlag) {
PendingTaskSafetyFlag::Pointer flag(PendingTaskSafetyFlag::Create());
rtc::scoped_refptr<PendingTaskSafetyFlag> flag =
PendingTaskSafetyFlag::Create();

int count = 0;
// Create two identical tasks that increment the |count|.
Expand Down
9 changes: 3 additions & 6 deletions video/call_stats2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,13 @@ CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue)
RTC_DCHECK(task_queue_);
process_thread_checker_.Detach();
task_queue_->PostDelayedTask(
ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }),
kUpdateIntervalMs);
ToQueuedTask(task_safety_, [this]() { RunTimer(); }), kUpdateIntervalMs);
}

CallStats::~CallStats() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
RTC_DCHECK(observers_.empty());

task_safety_flag_->SetNotAlive();

UpdateHistograms();
}

Expand All @@ -98,7 +95,7 @@ void CallStats::RunTimer() {
last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds();

task_queue_->PostDelayedTask(
ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), interval);
ToQueuedTask(task_safety_, [this]() { RunTimer(); }), interval);
}

void CallStats::UpdateAndReport() {
Expand Down Expand Up @@ -156,7 +153,7 @@ void CallStats::OnRttUpdate(int64_t rtt) {
RTC_DCHECK_RUN_ON(&process_thread_checker_);

int64_t now_ms = clock_->TimeInMilliseconds();
task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this, rtt, now_ms]() {
task_queue_->PostTask(ToQueuedTask(task_safety_, [this, rtt, now_ms]() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
reports_.push_back(RttTime(rtt, now_ms));
if (time_of_first_rtt_ms_ == -1)
Expand Down
3 changes: 1 addition & 2 deletions video/call_stats2.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ class CallStats {
TaskQueueBase* const task_queue_;

// Used to signal destruction to potentially pending tasks.
PendingTaskSafetyFlag::Pointer task_safety_flag_ =
PendingTaskSafetyFlag::Create();
ScopedTaskSafety task_safety_;

RTC_DISALLOW_COPY_AND_ASSIGN(CallStats);
};
Expand Down
52 changes: 24 additions & 28 deletions video/receive_statistics_proxy2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ ReceiveStatisticsProxy::ReceiveStatisticsProxy(

ReceiveStatisticsProxy::~ReceiveStatisticsProxy() {
RTC_DCHECK_RUN_ON(&main_thread_);
task_safety_flag_->SetNotAlive();
}

void ReceiveStatisticsProxy::UpdateHistograms(
Expand Down Expand Up @@ -689,18 +688,17 @@ VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const {

void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(
ToQueuedTask(task_safety_flag_, [payload_type, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.current_payload_type = payload_type;
}));
worker_thread_->PostTask(ToQueuedTask(task_safety_, [payload_type, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.current_payload_type = payload_type;
}));
}

void ReceiveStatisticsProxy::OnDecoderImplementationName(
const char* implementation_name) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(ToQueuedTask(
task_safety_flag_, [name = std::string(implementation_name), this]() {
task_safety_, [name = std::string(implementation_name), this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.decoder_implementation_name = name;
}));
Expand All @@ -715,7 +713,7 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated(
int render_delay_ms) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(ToQueuedTask(
task_safety_flag_,
task_safety_,
[max_decode_ms, current_delay_ms, target_delay_ms, jitter_buffer_ms,
min_playout_delay_ms, render_delay_ms, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
Expand All @@ -742,7 +740,7 @@ void ReceiveStatisticsProxy::OnUniqueFramesCounted(int num_unique_frames) {
void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated(
const TimingFrameInfo& info) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [info, this]() {
worker_thread_->PostTask(ToQueuedTask(task_safety_, [info, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
if (info.flags != VideoSendTiming::kInvalid) {
int64_t now_ms = clock_->TimeInMilliseconds();
Expand Down Expand Up @@ -777,11 +775,11 @@ void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated(
// [main] worker thread.
// So until the sender implementation has been updated, we work around this
// here by posting the update to the expected thread. We make a by value
// copy of the |task_safety_flag_| to handle the case if the queued task
// copy of the |task_safety_| to handle the case if the queued task
// runs after the |ReceiveStatisticsProxy| has been deleted. In such a
// case the packet_counter update won't be recorded.
worker_thread_->PostTask(
ToQueuedTask(task_safety_flag_, [ssrc, packet_counter, this]() {
ToQueuedTask(task_safety_, [ssrc, packet_counter, this]() {
RtcpPacketTypesCounterUpdated(ssrc, packet_counter);
}));
return;
Expand Down Expand Up @@ -810,7 +808,7 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame,
// "com.apple.coremedia.decompressionsession.clientcallback"
VideoFrameMetaData meta(frame, clock_->CurrentTime());
worker_thread_->PostTask(ToQueuedTask(
task_safety_flag_, [meta, qp, decode_time_ms, content_type, this]() {
task_safety_, [meta, qp, decode_time_ms, content_type, this]() {
OnDecodedFrame(meta, qp, decode_time_ms, content_type);
}));
}
Expand Down Expand Up @@ -936,8 +934,8 @@ void ReceiveStatisticsProxy::OnSyncOffsetUpdated(int64_t video_playout_ntp_ms,
RTC_DCHECK_RUN_ON(&incoming_render_queue_);
int64_t now_ms = clock_->TimeInMilliseconds();
worker_thread_->PostTask(
ToQueuedTask(task_safety_flag_, [video_playout_ntp_ms, sync_offset_ms,
estimated_freq_khz, now_ms, this]() {
ToQueuedTask(task_safety_, [video_playout_ntp_ms, sync_offset_ms,
estimated_freq_khz, now_ms, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
sync_offset_counter_.Add(std::abs(sync_offset_ms));
stats_.sync_offset_ms = sync_offset_ms;
Expand Down Expand Up @@ -990,24 +988,22 @@ void ReceiveStatisticsProxy::OnCompleteFrame(bool is_keyframe,

void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(
ToQueuedTask(task_safety_flag_, [frames_dropped, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.frames_dropped += frames_dropped;
}));
worker_thread_->PostTask(ToQueuedTask(task_safety_, [frames_dropped, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
stats_.frames_dropped += frames_dropped;
}));
}

void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) {
RTC_DCHECK_RUN_ON(&decode_queue_);
worker_thread_->PostTask(
ToQueuedTask(task_safety_flag_, [codec_type, qp, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
last_codec_type_ = codec_type;
if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
qp_counters_.vp8.Add(qp);
qp_sample_.Add(qp);
}
}));
worker_thread_->PostTask(ToQueuedTask(task_safety_, [codec_type, qp, this]() {
RTC_DCHECK_RUN_ON(&main_thread_);
last_codec_type_ = codec_type;
if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
qp_counters_.vp8.Add(qp);
qp_sample_.Add(qp);
}
}));
}

void ReceiveStatisticsProxy::OnStreamInactive() {
Expand Down
3 changes: 1 addition & 2 deletions video/receive_statistics_proxy2.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
// methods are invoked on such as GetStats().
TaskQueueBase* const worker_thread_;

PendingTaskSafetyFlag::Pointer task_safety_flag_ =
PendingTaskSafetyFlag::Create();
ScopedTaskSafety task_safety_;

SequenceChecker decode_queue_;
rtc::ThreadChecker main_thread_;
Expand Down
14 changes: 6 additions & 8 deletions video/rtp_streams_synchronizer2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue,

RtpStreamsSynchronizer::~RtpStreamsSynchronizer() {
RTC_DCHECK_RUN_ON(&main_checker_);
task_safety_flag_->SetNotAlive();
}

void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) {
Expand Down Expand Up @@ -85,13 +84,12 @@ void RtpStreamsSynchronizer::QueueTimer() {
}

RTC_DCHECK_LE(delay, kSyncIntervalMs);
task_queue_->PostDelayedTask(ToQueuedTask([this, safety = task_safety_flag_] {
if (!safety->alive())
return;
RTC_DCHECK_RUN_ON(&main_checker_);
timer_running_ = false;
UpdateDelay();
}),
task_queue_->PostDelayedTask(ToQueuedTask(task_safety_,
[this] {
RTC_DCHECK_RUN_ON(&main_checker_);
timer_running_ = false;
UpdateDelay();
}),
delay);
}

Expand Down
Loading

0 comments on commit a98cea8

Please sign in to comment.