diff --git a/rtc_base/swap_queue.h b/rtc_base/swap_queue.h index c3246dba94..8dd2a286dd 100644 --- a/rtc_base/swap_queue.h +++ b/rtc_base/swap_queue.h @@ -12,14 +12,12 @@ #define RTC_BASE_SWAP_QUEUE_H_ #include +#include #include #include #include "rtc_base/checks.h" -#include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" #include "rtc_base/system/unused.h" -#include "rtc_base/thread_annotations.h" namespace webrtc { @@ -42,11 +40,11 @@ class SwapQueueItemVerifier { bool operator()(const T& t) const { return QueueItemVerifierFunction(t); } }; -// This class is a fixed-size queue. A producer calls Insert() to insert -// an element of type T at the back of the queue, and a consumer calls -// Remove() to remove an element from the front of the queue. It's safe -// for the producer(s) and the consumer(s) to access the queue -// concurrently, from different threads. +// This class is a fixed-size queue. A single producer calls Insert() to insert +// an element of type T at the back of the queue, and a single consumer calls +// Remove() to remove an element from the front of the queue. It's safe for the +// producer and the consumer to access the queue concurrently, from different +// threads. // // To avoid the construction, copying, and destruction of Ts that a naive // queue implementation would require, for each "full" T passed from @@ -106,12 +104,20 @@ class SwapQueue { RTC_DCHECK(VerifyQueueSlots()); } - // Resets the queue to have zero content wile maintaining the queue size. + // Resets the queue to have zero content while maintaining the queue size. + // Just like Remove(), this can only be called (safely) from the + // consumer. void Clear() { - rtc::CritScope cs(&crit_queue_); - next_write_index_ = 0; - next_read_index_ = 0; - num_elements_ = 0; + // Drop all non-empty elements by resetting num_elements_ and incrementing + // next_read_index_ by the previous value of num_elements_. Relaxed memory + // ordering is sufficient since the dropped elements are not accessed. + next_read_index_ += std::atomic_exchange_explicit( + &num_elements_, size_t{0}, std::memory_order_relaxed); + if (next_read_index_ >= queue_.size()) { + next_read_index_ -= queue_.size(); + } + + RTC_DCHECK_LT(next_read_index_, queue_.size()); } // Inserts a "full" T at the back of the queue by swapping *input with an @@ -123,26 +129,33 @@ class SwapQueue { bool Insert(T* input) RTC_WARN_UNUSED_RESULT { RTC_DCHECK(input); - rtc::CritScope cs(&crit_queue_); - RTC_DCHECK(queue_item_verifier_(*input)); - if (num_elements_ == queue_.size()) { + // Load the value of num_elements_. Acquire memory ordering prevents reads + // and writes to queue_[next_write_index_] to be reordered to before the + // load. (That element might be accessed by a concurrent call to Remove() + // until the load finishes.) + if (std::atomic_load_explicit(&num_elements_, std::memory_order_acquire) == + queue_.size()) { return false; } - using std::swap; - swap(*input, queue_[next_write_index_]); + std::swap(*input, queue_[next_write_index_]); + + // Increment the value of num_elements_ to account for the inserted element. + // Release memory ordering prevents the reads and writes to + // queue_[next_write_index_] to be reordered to after the increment. (Once + // the increment has finished, Remove() might start accessing that element.) + const size_t old_num_elements = std::atomic_fetch_add_explicit( + &num_elements_, size_t{1}, std::memory_order_release); ++next_write_index_; if (next_write_index_ == queue_.size()) { next_write_index_ = 0; } - ++num_elements_; - RTC_DCHECK_LT(next_write_index_, queue_.size()); - RTC_DCHECK_LE(num_elements_, queue_.size()); + RTC_DCHECK_LT(old_num_elements, queue_.size()); return true; } @@ -156,56 +169,66 @@ class SwapQueue { bool Remove(T* output) RTC_WARN_UNUSED_RESULT { RTC_DCHECK(output); - rtc::CritScope cs(&crit_queue_); - RTC_DCHECK(queue_item_verifier_(*output)); - if (num_elements_ == 0) { + // Load the value of num_elements_. Acquire memory ordering prevents reads + // and writes to queue_[next_read_index_] to be reordered to before the + // load. (That element might be accessed by a concurrent call to Insert() + // until the load finishes.) + if (std::atomic_load_explicit(&num_elements_, std::memory_order_acquire) == + 0) { return false; } - using std::swap; - swap(*output, queue_[next_read_index_]); + std::swap(*output, queue_[next_read_index_]); + + // Decrement the value of num_elements_ to account for the removed element. + // Release memory ordering prevents the reads and writes to + // queue_[next_write_index_] to be reordered to after the decrement. (Once + // the decrement has finished, Insert() might start accessing that element.) + std::atomic_fetch_sub_explicit(&num_elements_, size_t{1}, + std::memory_order_release); ++next_read_index_; if (next_read_index_ == queue_.size()) { next_read_index_ = 0; } - --num_elements_; - RTC_DCHECK_LT(next_read_index_, queue_.size()); - RTC_DCHECK_LE(num_elements_, queue_.size()); return true; } private: - // Verify that the queue slots complies with the ItemVerifier test. + // Verify that the queue slots complies with the ItemVerifier test. This + // function is not thread-safe and can only be used in the constructors. bool VerifyQueueSlots() { - rtc::CritScope cs(&crit_queue_); for (const auto& v : queue_) { RTC_DCHECK(queue_item_verifier_(v)); } return true; } - rtc::CriticalSection crit_queue_; - // TODO(peah): Change this to use std::function() once we can use C++11 std // lib. - QueueItemVerifier queue_item_verifier_ RTC_GUARDED_BY(crit_queue_); + QueueItemVerifier queue_item_verifier_; - // (next_read_index_ + num_elements_) % queue_.size() = - // next_write_index_ - size_t next_write_index_ RTC_GUARDED_BY(crit_queue_) = 0; - size_t next_read_index_ RTC_GUARDED_BY(crit_queue_) = 0; - size_t num_elements_ RTC_GUARDED_BY(crit_queue_) = 0; + // Only accessed by the single producer. + size_t next_write_index_ = 0; - // queue_.size() is constant. - std::vector queue_ RTC_GUARDED_BY(crit_queue_); + // Only accessed by the single consumer. + size_t next_read_index_ = 0; - RTC_DISALLOW_COPY_AND_ASSIGN(SwapQueue); + // Accessed by both the producer and the consumer and used for synchronization + // between them. + std::atomic num_elements_{0}; + + // The elements of the queue are acced by both the producer and the consumer, + // mediated by num_elements_. queue_.size() is constant. + std::vector queue_; + + SwapQueue(const SwapQueue&) = delete; + SwapQueue& operator=(const SwapQueue&) = delete; }; } // namespace webrtc