Replacing unique pointer with raw pointer in SSCC checks.

Replacing the unique pointer used for access checks with a raw pointer
pointing to the object owned by the unique pointer. This is to stop
tsan from detecting a race between .get() done on the task queue and
.reset() done in the destructor.

Bug: webrtc:8415
Change-Id: Iae2ea9a2d38f319e73146e6b1e360b11b1708c76
Reviewed-on: https://webrtc-review.googlesource.com/62560
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#22492}
This commit is contained in:
Sebastian Jansson 2018-03-19 14:10:34 +01:00 committed by Commit Bot
parent 04d49500e2
commit 537012405b
2 changed files with 43 additions and 35 deletions

View File

@ -138,16 +138,16 @@ class SendSideCongestionController
void WaitOnTasksForTest();
private:
void MaybeCreateControllers();
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
void MaybeCreateControllers() RTC_RUN_ON(task_queue_ptr_);
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_ptr_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_ptr_);
void UpdatePacerQueue() RTC_RUN_ON(task_queue_ptr_);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_ptr_);
void MaybeUpdateOutstandingData();
void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
int64_t now_ms)
RTC_RUN_ON(task_queue_);
RTC_RUN_ON(task_queue_ptr_);
const Clock* const clock_;
// PacedSender is thread safe and doesn't need protection here.
@ -158,35 +158,40 @@ class SendSideCongestionController
const std::unique_ptr<NetworkControllerFactoryInterface> controller_factory_;
const std::unique_ptr<PacerController> pacer_controller_
RTC_GUARDED_BY(task_queue_);
RTC_GUARDED_BY(task_queue_ptr_);
std::unique_ptr<send_side_cc_internal::ControlHandler> control_handler_
RTC_GUARDED_BY(task_queue_);
RTC_GUARDED_BY(task_queue_ptr_);
std::unique_ptr<NetworkControllerInterface> controller_
RTC_GUARDED_BY(task_queue_);
RTC_GUARDED_BY(task_queue_ptr_);
TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_);
TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_ptr_);
std::map<uint32_t, RTCPReportBlock> last_report_blocks_
RTC_GUARDED_BY(task_queue_);
Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_);
RTC_GUARDED_BY(task_queue_ptr_);
Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_ptr_);
NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_);
NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_);
StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_);
NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_ptr_);
NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_ptr_);
StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_ptr_);
const bool send_side_bwe_with_overhead_;
// Transport overhead is written by OnNetworkRouteChanged and read by
// AddPacket.
// TODO(srte): Remove atomic when feedback adapter runs on task queue.
std::atomic<size_t> transport_overhead_bytes_per_packet_;
bool network_available_ RTC_GUARDED_BY(task_queue_);
bool network_available_ RTC_GUARDED_BY(task_queue_ptr_);
// Protects access to last_packet_feedback_vector_ in feedback adapter.
// TODO(srte): Remove this checker when feedback adapter runs on task queue.
rtc::RaceChecker worker_race_;
// Caches task_queue_.get(), to avoid racing with destructor.
// Note that this is declared before task_queue_ to ensure that it is not
// invalidated until no more tasks can be running on the task queue.
rtc::TaskQueue* task_queue_ptr_;
// Note that moving ownership of the task queue makes it neccessary to make
// sure that there is no outstanding tasks on it using destructed objects.
// This is currently guranteed by using explicit reset in the destructor of

View File

@ -315,6 +315,7 @@ SendSideCongestionController::SendSideCongestionController(
transport_overhead_bytes_per_packet_(0),
network_available_(false),
task_queue_(MakeUnique<rtc::TaskQueue>("SendSideCCQueue")) {
task_queue_ptr_ = task_queue_.get();
initial_config_.constraints =
ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
initial_config_.stream_based_config = StreamsConfig();
@ -333,7 +334,6 @@ SendSideCongestionController::SendSideCongestionController(
// bandwidth is set before this class is initialized so the controllers can be
// created in the constructor.
void SendSideCongestionController::MaybeCreateControllers() {
RTC_DCHECK_RUN_ON(task_queue_.get());
if (controller_ || !network_available_ || !observer_)
return;
@ -353,6 +353,9 @@ void SendSideCongestionController::MaybeCreateControllers() {
SendSideCongestionController::~SendSideCongestionController() {
// Must be destructed before any objects used by calls on the task queue.
task_queue_.reset();
// Singe the task queue has been destructed, it is now safe to reset
// task_queue_raw_ which is only used by tasks on the task queue.
task_queue_ptr_ = nullptr;
}
void SendSideCongestionController::RegisterPacketFeedbackObserver(
@ -368,7 +371,7 @@ void SendSideCongestionController::DeRegisterPacketFeedbackObserver(
void SendSideCongestionController::RegisterNetworkObserver(
NetworkChangedObserver* observer) {
task_queue_->PostTask([this, observer]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK(observer_ == nullptr);
observer_ = observer;
MaybeCreateControllers();
@ -381,7 +384,7 @@ void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps,
TargetRateConstraints constraints =
ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
task_queue_->PostTask([this, constraints, start_bitrate_bps]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
if (controller_) {
controller_->OnTargetRateConstraints(constraints);
} else {
@ -398,7 +401,7 @@ void SendSideCongestionController::SetAllocatedSendBitrateLimits(
int64_t max_total_bitrate_bps) {
task_queue_->PostTask([this, min_send_bitrate_bps, max_padding_bitrate_bps,
max_total_bitrate_bps]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps);
streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps);
streams_config_.max_total_allocated_bitrate =
@ -425,7 +428,7 @@ void SendSideCongestionController::OnNetworkRouteChanged(
msg.starting_rate = start_bitrate_bps > 0 ? DataRate::bps(start_bitrate_bps)
: DataRate::kNotInitialized;
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
controller_->OnNetworkRouteChange(msg);
pacer_controller_->OnNetworkRouteChange(msg);
});
@ -438,7 +441,7 @@ bool SendSideCongestionController::AvailableBandwidth(
// running on the task queue.
// TODO(srte): Remove this function when RtpTransportControllerSend stops
// calling it.
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
if (!control_handler_) {
return false;
}
@ -458,7 +461,7 @@ RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() {
void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) {
task_queue_->PostTask([this, enable]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
streams_config_.requests_alr_probing = enable;
UpdateStreamsConfig();
});
@ -482,7 +485,7 @@ void SendSideCongestionController::SignalNetworkState(NetworkState state) {
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.network_available = state == kNetworkUp;
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
network_available_ = msg.network_available;
if (controller_) {
controller_->OnNetworkAvailability(msg);
@ -509,7 +512,7 @@ void SendSideCongestionController::OnSentPacket(
msg.size = DataSize::bytes(packet->payload_size);
msg.send_time = Timestamp::ms(packet->send_time_ms);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
if (controller_)
controller_->OnSentPacket(msg);
});
@ -524,7 +527,7 @@ void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
report.round_trip_time = TimeDelta::ms(avg_rtt_ms);
report.smoothed = true;
task_queue_->PostTask([this, report]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
if (controller_)
controller_->OnRoundTripTimeUpdate(report);
});
@ -540,7 +543,7 @@ void SendSideCongestionController::Process() {
}
void SendSideCongestionController::StartProcessPeriodicTasks() {
task_queue_->PostDelayedTask(
task_queue_ptr_->PostDelayedTask(
NewPeriodicTask(
rtc::Bind(
&SendSideCongestionController::UpdateControllerWithTimeInterval,
@ -548,7 +551,7 @@ void SendSideCongestionController::StartProcessPeriodicTasks() {
process_interval_.ms()),
process_interval_.ms());
task_queue_->PostDelayedTask(
task_queue_ptr_->PostDelayedTask(
NewPeriodicTask(
rtc::Bind(&SendSideCongestionController::UpdatePacerQueue, this),
PacerQueueUpdateIntervalMs),
@ -605,7 +608,7 @@ void SendSideCongestionController::OnTransportFeedback(
msg.data_in_flight =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
if (controller_)
controller_->OnTransportPacketsFeedback(msg);
});
@ -617,7 +620,7 @@ void SendSideCongestionController::MaybeUpdateOutstandingData() {
msg.in_flight_data =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
pacer_controller_->OnOutstandingData(msg);
});
}
@ -630,7 +633,7 @@ SendSideCongestionController::GetTransportFeedbackVector() const {
void SendSideCongestionController::PostPeriodicTasksForTest() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
UpdateControllerWithTimeInterval();
UpdatePacerQueue();
});
@ -644,7 +647,7 @@ void SendSideCongestionController::WaitOnTasksForTest() {
void SendSideCongestionController::SetPacingFactor(float pacing_factor) {
task_queue_->PostTask([this, pacing_factor]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
streams_config_.pacing_factor = pacing_factor;
UpdateStreamsConfig();
});
@ -656,7 +659,7 @@ void SendSideCongestionController::OnReceivedEstimatedBitrate(
msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.bandwidth = DataRate::bps(bitrate);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
if (controller_)
controller_->OnRemoteBitrateReport(msg);
});
@ -667,12 +670,12 @@ void SendSideCongestionController::OnReceivedRtcpReceiverReport(
int64_t rtt_ms,
int64_t now_ms) {
task_queue_->PostTask([this, report_blocks, now_ms]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
});
task_queue_->PostTask([this, now_ms, rtt_ms]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RoundTripTimeUpdate report;
report.receive_time = Timestamp::ms(now_ms);
report.round_trip_time = TimeDelta::ms(rtt_ms);