dcsctp: Add metrics support
To support implementing RTCSctpTransportStats, a few metrics are needed. Some more were added that are useful for metric collection in SFUs. Bug: webrtc:13052 Change-Id: Idafd49e1084922d01d3e6c5860715f63aea08b7d Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228243 Commit-Queue: Victor Boivie <boivie@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Reviewed-by: Florent Castelli <orphis@webrtc.org> Cr-Commit-Position: refs/heads/master@{#34708}
This commit is contained in:
parent
1118ebac0a
commit
d4716eaf60
@ -155,6 +155,43 @@ inline constexpr absl::string_view ToString(ResetStreamsStatus error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Tracked metrics, which is the return value of GetMetrics. Optional members
|
||||
// will be unset when they are not yet known.
|
||||
struct Metrics {
|
||||
// Transmission stats and metrics.
|
||||
|
||||
// Number of packets sent.
|
||||
size_t tx_packets_count = 0;
|
||||
|
||||
// Number of messages requested to be sent.
|
||||
size_t tx_messages_count = 0;
|
||||
|
||||
// The current congestion window (cwnd) in bytes, corresponding to spinfo_cwnd
|
||||
// defined in RFC6458.
|
||||
absl::optional<size_t> cwnd_bytes = absl::nullopt;
|
||||
|
||||
// Smoothed round trip time, corresponding to spinfo_srtt defined in RFC6458.
|
||||
absl::optional<int> srtt_ms = absl::nullopt;
|
||||
|
||||
// Number of data items in the retransmission queue that haven’t been
|
||||
// acked/nacked yet and are in-flight. Corresponding to sstat_unackdata
|
||||
// defined in RFC6458. This may be an approximation when there are messages in
|
||||
// the send queue that haven't been fragmented/packetized yet.
|
||||
size_t unack_data_count = 0;
|
||||
|
||||
// Receive stats and metrics.
|
||||
|
||||
// Number of packets received.
|
||||
size_t rx_packets_count = 0;
|
||||
|
||||
// Number of messages received.
|
||||
size_t rx_messages_count = 0;
|
||||
|
||||
// The peer’s last announced receiver window size, corresponding to
|
||||
// sstat_rwnd defined in RFC6458.
|
||||
absl::optional<uint32_t> peer_rwnd_bytes = absl::nullopt;
|
||||
};
|
||||
|
||||
// Callbacks that the DcSctpSocket will be done synchronously to the owning
|
||||
// client. It is allowed to call back into the library from callbacks that start
|
||||
// with "On". It has been explicitly documented when it's not allowed to call
|
||||
@ -350,6 +387,9 @@ class DcSctpSocketInterface {
|
||||
// OnBufferedAmountLow event. The default value is zero (0).
|
||||
virtual void SetBufferedAmountLowThreshold(StreamID stream_id,
|
||||
size_t bytes) = 0;
|
||||
|
||||
// Retrieves the latest metrics.
|
||||
virtual Metrics GetMetrics() const = 0;
|
||||
};
|
||||
} // namespace dcsctp
|
||||
|
||||
|
||||
@ -57,6 +57,8 @@ class MockDcSctpSocket : public DcSctpSocketInterface {
|
||||
SetBufferedAmountLowThreshold,
|
||||
(StreamID stream_id, size_t bytes),
|
||||
(override));
|
||||
|
||||
MOCK_METHOD(Metrics, GetMetrics, (), (const, override));
|
||||
};
|
||||
|
||||
} // namespace dcsctp
|
||||
|
||||
@ -381,6 +381,7 @@ SendStatus DcSctpSocket::Send(DcSctpMessage message,
|
||||
}
|
||||
|
||||
TimeMs now = callbacks_.TimeMillis();
|
||||
++metrics_.tx_messages_count;
|
||||
send_queue_.Add(now, std::move(message), send_options);
|
||||
if (tcb_ != nullptr) {
|
||||
tcb_->SendBufferedPackets(now);
|
||||
@ -456,6 +457,26 @@ void DcSctpSocket::SetBufferedAmountLowThreshold(StreamID stream_id,
|
||||
send_queue_.SetBufferedAmountLowThreshold(stream_id, bytes);
|
||||
}
|
||||
|
||||
Metrics DcSctpSocket::GetMetrics() const {
|
||||
Metrics metrics = metrics_;
|
||||
|
||||
if (tcb_ != nullptr) {
|
||||
// Update the metrics with some stats that are extracted from
|
||||
// sub-components.
|
||||
metrics.cwnd_bytes = tcb_->cwnd();
|
||||
metrics.srtt_ms = tcb_->current_srtt().value();
|
||||
size_t packet_payload_size =
|
||||
options_.mtu - SctpPacket::kHeaderSize - DataChunk::kHeaderSize;
|
||||
metrics.unack_data_count =
|
||||
tcb_->retransmission_queue().outstanding_items() +
|
||||
(send_queue_.total_buffered_amount() + packet_payload_size - 1) /
|
||||
packet_payload_size;
|
||||
metrics.peer_rwnd_bytes = tcb_->retransmission_queue().rwnd();
|
||||
}
|
||||
|
||||
return metrics;
|
||||
}
|
||||
|
||||
void DcSctpSocket::MaybeSendShutdownOnPacketReceived(const SctpPacket& packet) {
|
||||
if (state_ == State::kShutdownSent) {
|
||||
bool has_data_chunk =
|
||||
@ -588,6 +609,8 @@ void DcSctpSocket::HandleTimeout(TimeoutID timeout_id) {
|
||||
}
|
||||
|
||||
void DcSctpSocket::ReceivePacket(rtc::ArrayView<const uint8_t> data) {
|
||||
++metrics_.rx_packets_count;
|
||||
|
||||
if (packet_observer_ != nullptr) {
|
||||
packet_observer_->OnReceivedPacket(callbacks_.TimeMillis(), data);
|
||||
}
|
||||
@ -834,6 +857,7 @@ void DcSctpSocket::SendPacket(SctpPacket::Builder& builder) {
|
||||
if (packet_observer_ != nullptr) {
|
||||
packet_observer_->OnSentPacket(callbacks_.TimeMillis(), payload);
|
||||
}
|
||||
++metrics_.tx_packets_count;
|
||||
callbacks_.SendPacket(payload);
|
||||
}
|
||||
|
||||
@ -1267,6 +1291,7 @@ void DcSctpSocket::HandleCookieAck(
|
||||
void DcSctpSocket::DeliverReassembledMessages() {
|
||||
if (tcb_->reassembly_queue().HasMessages()) {
|
||||
for (auto& message : tcb_->reassembly_queue().FlushMessages()) {
|
||||
++metrics_.rx_messages_count;
|
||||
callbacks_.OnMessageReceived(std::move(message));
|
||||
}
|
||||
}
|
||||
|
||||
@ -96,6 +96,8 @@ class DcSctpSocket : public DcSctpSocketInterface {
|
||||
size_t buffered_amount(StreamID stream_id) const override;
|
||||
size_t buffered_amount_low_threshold(StreamID stream_id) const override;
|
||||
void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
|
||||
Metrics GetMetrics() const override;
|
||||
|
||||
// Returns this socket's verification tag, or zero if not yet connected.
|
||||
VerificationTag verification_tag() const {
|
||||
return tcb_ != nullptr ? tcb_->my_verification_tag() : VerificationTag(0);
|
||||
@ -245,6 +247,7 @@ class DcSctpSocket : public DcSctpSocketInterface {
|
||||
|
||||
const std::string log_prefix_;
|
||||
const std::unique_ptr<PacketObserver> packet_observer_;
|
||||
Metrics metrics_;
|
||||
DcSctpOptions options_;
|
||||
|
||||
// Enqueues callbacks and dispatches them just before returning to the caller.
|
||||
|
||||
@ -1608,5 +1608,110 @@ TEST_F(DcSctpSocketTest, TriggersOnTotalBufferAmountLowWhenCrossingThreshold) {
|
||||
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
|
||||
}
|
||||
|
||||
TEST_F(DcSctpSocketTest, InitialMetricsAreZeroed) {
|
||||
Metrics metrics = sock_a_.GetMetrics();
|
||||
EXPECT_EQ(metrics.tx_packets_count, 0u);
|
||||
EXPECT_EQ(metrics.tx_messages_count, 0u);
|
||||
EXPECT_EQ(metrics.cwnd_bytes.has_value(), false);
|
||||
EXPECT_EQ(metrics.srtt_ms.has_value(), false);
|
||||
EXPECT_EQ(metrics.unack_data_count, 0u);
|
||||
EXPECT_EQ(metrics.rx_packets_count, 0u);
|
||||
EXPECT_EQ(metrics.rx_messages_count, 0u);
|
||||
EXPECT_EQ(metrics.peer_rwnd_bytes.has_value(), false);
|
||||
}
|
||||
|
||||
TEST_F(DcSctpSocketTest, RxAndTxPacketMetricsIncrease) {
|
||||
ConnectSockets();
|
||||
|
||||
const size_t initial_a_rwnd = options_.max_receiver_window_buffer_size *
|
||||
ReassemblyQueue::kHighWatermarkLimit;
|
||||
|
||||
EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 2u);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 2u);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 0u);
|
||||
EXPECT_EQ(*sock_a_.GetMetrics().cwnd_bytes,
|
||||
options_.cwnd_mtus_initial * options_.mtu);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
|
||||
|
||||
EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 2u);
|
||||
EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 0u);
|
||||
|
||||
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), kSendOptions);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 1u);
|
||||
|
||||
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); // DATA
|
||||
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); // SACK
|
||||
EXPECT_EQ(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
|
||||
|
||||
EXPECT_TRUE(cb_z_.ConsumeReceivedMessage().has_value());
|
||||
|
||||
EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 3u);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 3u);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 1u);
|
||||
|
||||
EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 3u);
|
||||
EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 1u);
|
||||
|
||||
// Send one more (large - fragmented), and receive the delayed SACK.
|
||||
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
|
||||
std::vector<uint8_t>(options_.mtu * 2 + 1)),
|
||||
kSendOptions);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 3u);
|
||||
|
||||
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); // DATA
|
||||
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); // DATA
|
||||
|
||||
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); // SACK
|
||||
EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 1u);
|
||||
EXPECT_GT(*sock_a_.GetMetrics().peer_rwnd_bytes, 0u);
|
||||
EXPECT_LT(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
|
||||
|
||||
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); // DATA
|
||||
|
||||
EXPECT_TRUE(cb_z_.ConsumeReceivedMessage().has_value());
|
||||
|
||||
EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 6u);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 4u);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 2u);
|
||||
|
||||
EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 6u);
|
||||
EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 2u);
|
||||
|
||||
// Delayed sack
|
||||
AdvanceTime(options_.delayed_ack_max_timeout);
|
||||
RunTimers();
|
||||
|
||||
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); // SACK
|
||||
EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
|
||||
EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 5u);
|
||||
EXPECT_EQ(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
|
||||
}
|
||||
|
||||
TEST_F(DcSctpSocketTest, UnackDataAlsoIncludesSendQueue) {
|
||||
ConnectSockets();
|
||||
|
||||
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
|
||||
std::vector<uint8_t>(kLargeMessageSize)),
|
||||
kSendOptions);
|
||||
size_t payload_bytes =
|
||||
options_.mtu - SctpPacket::kHeaderSize - DataChunk::kHeaderSize;
|
||||
|
||||
size_t expected_sent_packets = options_.cwnd_mtus_initial;
|
||||
|
||||
size_t expected_queued_bytes =
|
||||
kLargeMessageSize - expected_sent_packets * payload_bytes;
|
||||
|
||||
size_t expected_queued_packets = expected_queued_bytes / payload_bytes;
|
||||
|
||||
// Due to alignment, padding etc, it's hard to calculate the exact number, but
|
||||
// it should be in this range.
|
||||
EXPECT_GE(sock_a_.GetMetrics().unack_data_count,
|
||||
expected_sent_packets + expected_queued_packets);
|
||||
|
||||
EXPECT_LE(sock_a_.GetMetrics().unack_data_count,
|
||||
expected_sent_packets + expected_queued_packets + 2);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace dcsctp
|
||||
|
||||
@ -130,6 +130,8 @@ class TransmissionControlBlock : public Context {
|
||||
RetransmissionQueue& retransmission_queue() { return retransmission_queue_; }
|
||||
StreamResetHandler& stream_reset_handler() { return stream_reset_handler_; }
|
||||
HeartbeatHandler& heartbeat_handler() { return heartbeat_handler_; }
|
||||
size_t cwnd() const { return retransmission_queue_.cwnd(); }
|
||||
DurationMs current_srtt() const { return rto_.srtt(); }
|
||||
|
||||
// Returns this socket's verification tag, set in all packet headers.
|
||||
VerificationTag my_verification_tag() const { return my_verification_tag_; }
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user