snowstorm/net/
metrics.rs

1use std::{collections::HashMap, pin::Pin, task::Poll};
2
3use futures::ready;
4use libp2p::{metrics::Registry, PeerId, Stream};
5use log::debug;
6pub use prometheus_client::encoding::protobuf::encode;
7use prometheus_client::{
8    encoding::{protobuf, EncodeLabelSet, EncodeLabelValue},
9    metrics::{counter::Counter, family::Family},
10    registry::Unit,
11};
12
13use serde::{Deserialize, Serialize};
14use tokio::io::{AsyncRead, AsyncWrite};
15use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
16
17use super::connections::ConnectionsMap;
18
19#[derive(Clone, Debug, Default, Serialize, Deserialize)]
20pub struct BandwidthSummary {
21    pub inbound: u64,
22    pub outbound: u64,
23}
24
25impl BandwidthSummary {
26    pub fn inbound(bytes: u64) -> Self {
27        BandwidthSummary {
28            inbound: bytes,
29            outbound: 0,
30        }
31    }
32
33    pub fn outbound(bytes: u64) -> Self {
34        BandwidthSummary {
35            inbound: 0,
36            outbound: bytes,
37        }
38    }
39}
40
41#[derive(Clone, Debug, Default, Serialize, Deserialize)]
42pub struct ConnectionBandwidthRecord {
43    pub peer_id: String,
44    pub peer_role: String,
45    pub summary: BandwidthSummary,
46}
47
48#[derive(Clone, Debug, Default, Serialize, Deserialize)]
49pub struct DetailedBandwidthSummary {
50    pub records: Vec<ConnectionBandwidthRecord>,
51}
52
53fn find_label<'a>(
54    metrics: &'a protobuf::openmetrics_data_model::Metric,
55    label: &str,
56) -> Option<&'a str> {
57    metrics
58        .labels
59        .iter()
60        .find(|l| l.name == label)
61        .map(|l| l.value.as_str())
62}
63
64pub fn sample_snowstorm_bandwidth(
65    registry: &Registry,
66) -> Result<DetailedBandwidthSummary, std::fmt::Error> {
67    let mut result = DetailedBandwidthSummary::default();
68
69    let metrics = protobuf::encode(registry)?;
70    let mut records: HashMap<(&str, &str), BandwidthSummary> = HashMap::new();
71
72    for family in &metrics.metric_families {
73        if family.name != "snowstorm_peer_bandwidth" {
74            continue;
75        }
76
77        for metric in &family.metrics {
78            let peer_id = find_label(metric, "peer_id");
79            let peer_role = find_label(metric, "peer_role");
80            let direction = find_label(metric, "direction");
81
82            if peer_id.is_none() || peer_role.is_none() || direction.is_none() {
83                debug!(
84                    "DEBUG: Missing peer_id or peer_role in metric: {:?}",
85                    metric
86                );
87                continue;
88            }
89
90            let peer_id = peer_id.unwrap();
91            let peer_role = peer_role.unwrap();
92            let direction = direction.unwrap();
93
94            let entry = records.entry((peer_id, peer_role));
95
96            let total = metric
97                .metric_points
98                .iter()
99                .filter_map(|point| match &point.value {
100                    Some(protobuf::openmetrics_data_model::metric_point::Value::CounterValue(
101                        counter,
102                    )) => match counter.total {
103                        Some(protobuf::openmetrics_data_model::counter_value::Total::IntValue(
104                            val,
105                        )) => Some(val),
106                        _ => None,
107                    },
108                    _ => None,
109                })
110                .sum::<u64>();
111
112            let update = if direction == "Inbound" {
113                BandwidthSummary::inbound(total)
114            } else if direction == "Outbound" {
115                BandwidthSummary::outbound(total)
116            } else {
117                debug!("DEBUG: Unknown direction in metric: {:?}", metric);
118                continue;
119            };
120
121            entry
122                .and_modify(|v| {
123                    v.inbound += update.inbound;
124                    v.outbound += update.outbound;
125                })
126                .or_insert(update);
127        }
128    }
129
130    for ((peer_id, peer_role), summary) in records {
131        result.records.push(ConnectionBandwidthRecord {
132            peer_id: peer_id.to_string(),
133            peer_role: peer_role.to_string(),
134            summary,
135        });
136    }
137
138    Ok(result)
139}
140
141pub fn sample_libp2p_bandwidth(
142    registry: &mut Registry,
143) -> Result<BandwidthSummary, std::fmt::Error> {
144    let mut summary = BandwidthSummary::default();
145    let metrics = protobuf::encode(registry)?;
146
147    for family in &metrics.metric_families {
148        // Name comes from `registry.sub_registry_with_prefix("libp2p").register_with_unit("bandwidth", ...)` in libp2p
149        if family.name != "libp2p_bandwidth" {
150            continue;
151        }
152
153        for metric in &family.metrics {
154            let direction = metric
155                .labels
156                .iter()
157                .find(|l| l.name == "direction")
158                .map(|l| l.value.as_str());
159
160            let total = metric
161                .metric_points
162                .iter()
163                .filter_map(|point| match &point.value {
164                    Some(protobuf::openmetrics_data_model::metric_point::Value::CounterValue(
165                        counter,
166                    )) => match counter.total {
167                        Some(protobuf::openmetrics_data_model::counter_value::Total::IntValue(
168                            val,
169                        )) => Some(val),
170                        _ => None,
171                    },
172                    _ => None,
173                })
174                .sum::<u64>();
175
176            match direction {
177                Some("Inbound") => summary.inbound += total,
178                Some("Outbound") => summary.outbound += total,
179                _ => debug!(
180                    "DEBUG: Unknown or missing direction in metric: {:?}",
181                    metric
182                ),
183            }
184        }
185    }
186
187    Ok(summary)
188}
189
190#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
191pub struct Labels {
192    peer_id: String,
193    own_peer_id: String,
194    direction: Direction,
195    peer_role: String,
196    protocol_stack: Option<String>,
197}
198
199#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelValue, Debug)]
200pub enum Direction {
201    Inbound,
202    Outbound,
203}
204
205pub struct Metrics {
206    pub peer_bandwidth: Family<Labels, Counter>,
207}
208
209impl Metrics {
210    pub fn new(registry: &mut Registry) -> Self {
211        let metrics = Metrics {
212            peer_bandwidth: Family::<Labels, Counter>::default(),
213        };
214
215        registry
216            .sub_registry_with_prefix("snowstorm")
217            .register_with_unit(
218                "peer_bandwidth",
219                "Bandwidth usage by own peer, peer, role and direction",
220                Unit::Bytes,
221                metrics.peer_bandwidth.clone(),
222            );
223
224        metrics
225    }
226}
227
228pub struct InstrumentedStream {
229    peer_bandwidth_inbound: Counter,
230    peer_bandwidth_outbound: Counter,
231    inner: Compat<Stream>,
232}
233
234impl AsyncRead for InstrumentedStream {
235    fn poll_read(
236        self: std::pin::Pin<&mut Self>,
237        cx: &mut std::task::Context<'_>,
238        buf: &mut tokio::io::ReadBuf<'_>,
239    ) -> Poll<std::io::Result<()>> {
240        let counter = self.peer_bandwidth_inbound.clone();
241        let bytes_pre = buf.filled().len();
242
243        ready!(Pin::new(&mut self.get_mut().inner).poll_read(cx, buf))?;
244
245        let bytes_read = buf.filled().len() - bytes_pre;
246        counter.inc_by(u64::try_from(bytes_read).unwrap_or(u64::max_value()));
247
248        Poll::Ready(Ok(()))
249    }
250}
251
252impl AsyncWrite for InstrumentedStream {
253    fn poll_write(
254        self: std::pin::Pin<&mut Self>,
255        cx: &mut std::task::Context,
256        buf: &[u8],
257    ) -> Poll<std::io::Result<usize>> {
258        let counter = self.peer_bandwidth_outbound.clone();
259        let bytes = ready!(Pin::new(&mut self.get_mut().inner).poll_write(cx, buf))?;
260        counter.inc_by(u64::try_from(bytes).unwrap_or(u64::max_value()));
261
262        Poll::Ready(Ok(bytes))
263    }
264
265    fn poll_write_vectored(
266        self: Pin<&mut Self>,
267        cx: &mut std::task::Context<'_>,
268        bufs: &[std::io::IoSlice<'_>],
269    ) -> Poll<Result<usize, std::io::Error>> {
270        let counter = self.peer_bandwidth_outbound.clone();
271        let bytes = ready!(Pin::new(&mut self.get_mut().inner).poll_write_vectored(cx, bufs))?;
272        counter.inc_by(u64::try_from(bytes).unwrap_or(u64::max_value()));
273
274        Poll::Ready(Ok(bytes))
275    }
276
277    fn poll_flush(
278        self: std::pin::Pin<&mut Self>,
279        cx: &mut std::task::Context,
280    ) -> Poll<std::io::Result<()>> {
281        Pin::new(&mut self.get_mut().inner).poll_flush(cx)
282    }
283
284    fn poll_shutdown(
285        self: std::pin::Pin<&mut Self>,
286        cx: &mut std::task::Context,
287    ) -> Poll<std::io::Result<()>> {
288        Pin::new(&mut self.get_mut().inner).poll_shutdown(cx)
289    }
290}
291
292pub fn instrument(
293    stream: Stream,
294    own_peer_id: &PeerId,
295    peer_id: &PeerId,
296    role: &str,
297    metrics: &Metrics,
298    connections_map: &ConnectionsMap,
299) -> InstrumentedStream {
300    let protocol_stack = connections_map.get(peer_id);
301
302    let peer_bandwidth_inbound = metrics
303        .peer_bandwidth
304        .get_or_create(&Labels {
305            own_peer_id: own_peer_id.to_string(),
306            peer_id: peer_id.to_string(),
307            direction: Direction::Inbound,
308            peer_role: role.to_owned(),
309            protocol_stack: protocol_stack.clone(),
310        })
311        .clone();
312
313    let peer_bandwidth_outbound = metrics
314        .peer_bandwidth
315        .get_or_create(&Labels {
316            own_peer_id: own_peer_id.to_string(),
317            peer_id: peer_id.to_string(),
318            direction: Direction::Outbound,
319            peer_role: role.to_owned(),
320            protocol_stack: protocol_stack,
321        })
322        .clone();
323
324    InstrumentedStream {
325        peer_bandwidth_inbound,
326        peer_bandwidth_outbound,
327        inner: stream.compat(),
328    }
329}