1use std::{collections::HashMap, pin::Pin, task::Poll};
2
3use futures::ready;
4use libp2p::{metrics::Registry, PeerId, Stream};
5use log::debug;
6pub use prometheus_client::encoding::protobuf::encode;
7use prometheus_client::{
8 encoding::{protobuf, EncodeLabelSet, EncodeLabelValue},
9 metrics::{counter::Counter, family::Family},
10 registry::Unit,
11};
12
13use serde::{Deserialize, Serialize};
14use tokio::io::{AsyncRead, AsyncWrite};
15use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
16
17use super::connections::ConnectionsMap;
18
19#[derive(Clone, Debug, Default, Serialize, Deserialize)]
20pub struct BandwidthSummary {
21 pub inbound: u64,
22 pub outbound: u64,
23}
24
25impl BandwidthSummary {
26 pub fn inbound(bytes: u64) -> Self {
27 BandwidthSummary {
28 inbound: bytes,
29 outbound: 0,
30 }
31 }
32
33 pub fn outbound(bytes: u64) -> Self {
34 BandwidthSummary {
35 inbound: 0,
36 outbound: bytes,
37 }
38 }
39}
40
41#[derive(Clone, Debug, Default, Serialize, Deserialize)]
42pub struct ConnectionBandwidthRecord {
43 pub peer_id: String,
44 pub peer_role: String,
45 pub summary: BandwidthSummary,
46}
47
48#[derive(Clone, Debug, Default, Serialize, Deserialize)]
49pub struct DetailedBandwidthSummary {
50 pub records: Vec<ConnectionBandwidthRecord>,
51}
52
53fn find_label<'a>(
54 metrics: &'a protobuf::openmetrics_data_model::Metric,
55 label: &str,
56) -> Option<&'a str> {
57 metrics
58 .labels
59 .iter()
60 .find(|l| l.name == label)
61 .map(|l| l.value.as_str())
62}
63
64pub fn sample_snowstorm_bandwidth(
65 registry: &Registry,
66) -> Result<DetailedBandwidthSummary, std::fmt::Error> {
67 let mut result = DetailedBandwidthSummary::default();
68
69 let metrics = protobuf::encode(registry)?;
70 let mut records: HashMap<(&str, &str), BandwidthSummary> = HashMap::new();
71
72 for family in &metrics.metric_families {
73 if family.name != "snowstorm_peer_bandwidth" {
74 continue;
75 }
76
77 for metric in &family.metrics {
78 let peer_id = find_label(metric, "peer_id");
79 let peer_role = find_label(metric, "peer_role");
80 let direction = find_label(metric, "direction");
81
82 if peer_id.is_none() || peer_role.is_none() || direction.is_none() {
83 debug!(
84 "DEBUG: Missing peer_id or peer_role in metric: {:?}",
85 metric
86 );
87 continue;
88 }
89
90 let peer_id = peer_id.unwrap();
91 let peer_role = peer_role.unwrap();
92 let direction = direction.unwrap();
93
94 let entry = records.entry((peer_id, peer_role));
95
96 let total = metric
97 .metric_points
98 .iter()
99 .filter_map(|point| match &point.value {
100 Some(protobuf::openmetrics_data_model::metric_point::Value::CounterValue(
101 counter,
102 )) => match counter.total {
103 Some(protobuf::openmetrics_data_model::counter_value::Total::IntValue(
104 val,
105 )) => Some(val),
106 _ => None,
107 },
108 _ => None,
109 })
110 .sum::<u64>();
111
112 let update = if direction == "Inbound" {
113 BandwidthSummary::inbound(total)
114 } else if direction == "Outbound" {
115 BandwidthSummary::outbound(total)
116 } else {
117 debug!("DEBUG: Unknown direction in metric: {:?}", metric);
118 continue;
119 };
120
121 entry
122 .and_modify(|v| {
123 v.inbound += update.inbound;
124 v.outbound += update.outbound;
125 })
126 .or_insert(update);
127 }
128 }
129
130 for ((peer_id, peer_role), summary) in records {
131 result.records.push(ConnectionBandwidthRecord {
132 peer_id: peer_id.to_string(),
133 peer_role: peer_role.to_string(),
134 summary,
135 });
136 }
137
138 Ok(result)
139}
140
141pub fn sample_libp2p_bandwidth(
142 registry: &mut Registry,
143) -> Result<BandwidthSummary, std::fmt::Error> {
144 let mut summary = BandwidthSummary::default();
145 let metrics = protobuf::encode(registry)?;
146
147 for family in &metrics.metric_families {
148 if family.name != "libp2p_bandwidth" {
150 continue;
151 }
152
153 for metric in &family.metrics {
154 let direction = metric
155 .labels
156 .iter()
157 .find(|l| l.name == "direction")
158 .map(|l| l.value.as_str());
159
160 let total = metric
161 .metric_points
162 .iter()
163 .filter_map(|point| match &point.value {
164 Some(protobuf::openmetrics_data_model::metric_point::Value::CounterValue(
165 counter,
166 )) => match counter.total {
167 Some(protobuf::openmetrics_data_model::counter_value::Total::IntValue(
168 val,
169 )) => Some(val),
170 _ => None,
171 },
172 _ => None,
173 })
174 .sum::<u64>();
175
176 match direction {
177 Some("Inbound") => summary.inbound += total,
178 Some("Outbound") => summary.outbound += total,
179 _ => debug!(
180 "DEBUG: Unknown or missing direction in metric: {:?}",
181 metric
182 ),
183 }
184 }
185 }
186
187 Ok(summary)
188}
189
190#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
191pub struct Labels {
192 peer_id: String,
193 own_peer_id: String,
194 direction: Direction,
195 peer_role: String,
196 protocol_stack: Option<String>,
197}
198
199#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelValue, Debug)]
200pub enum Direction {
201 Inbound,
202 Outbound,
203}
204
205pub struct Metrics {
206 pub peer_bandwidth: Family<Labels, Counter>,
207}
208
209impl Metrics {
210 pub fn new(registry: &mut Registry) -> Self {
211 let metrics = Metrics {
212 peer_bandwidth: Family::<Labels, Counter>::default(),
213 };
214
215 registry
216 .sub_registry_with_prefix("snowstorm")
217 .register_with_unit(
218 "peer_bandwidth",
219 "Bandwidth usage by own peer, peer, role and direction",
220 Unit::Bytes,
221 metrics.peer_bandwidth.clone(),
222 );
223
224 metrics
225 }
226}
227
228pub struct InstrumentedStream {
229 peer_bandwidth_inbound: Counter,
230 peer_bandwidth_outbound: Counter,
231 inner: Compat<Stream>,
232}
233
234impl AsyncRead for InstrumentedStream {
235 fn poll_read(
236 self: std::pin::Pin<&mut Self>,
237 cx: &mut std::task::Context<'_>,
238 buf: &mut tokio::io::ReadBuf<'_>,
239 ) -> Poll<std::io::Result<()>> {
240 let counter = self.peer_bandwidth_inbound.clone();
241 let bytes_pre = buf.filled().len();
242
243 ready!(Pin::new(&mut self.get_mut().inner).poll_read(cx, buf))?;
244
245 let bytes_read = buf.filled().len() - bytes_pre;
246 counter.inc_by(u64::try_from(bytes_read).unwrap_or(u64::max_value()));
247
248 Poll::Ready(Ok(()))
249 }
250}
251
252impl AsyncWrite for InstrumentedStream {
253 fn poll_write(
254 self: std::pin::Pin<&mut Self>,
255 cx: &mut std::task::Context,
256 buf: &[u8],
257 ) -> Poll<std::io::Result<usize>> {
258 let counter = self.peer_bandwidth_outbound.clone();
259 let bytes = ready!(Pin::new(&mut self.get_mut().inner).poll_write(cx, buf))?;
260 counter.inc_by(u64::try_from(bytes).unwrap_or(u64::max_value()));
261
262 Poll::Ready(Ok(bytes))
263 }
264
265 fn poll_write_vectored(
266 self: Pin<&mut Self>,
267 cx: &mut std::task::Context<'_>,
268 bufs: &[std::io::IoSlice<'_>],
269 ) -> Poll<Result<usize, std::io::Error>> {
270 let counter = self.peer_bandwidth_outbound.clone();
271 let bytes = ready!(Pin::new(&mut self.get_mut().inner).poll_write_vectored(cx, bufs))?;
272 counter.inc_by(u64::try_from(bytes).unwrap_or(u64::max_value()));
273
274 Poll::Ready(Ok(bytes))
275 }
276
277 fn poll_flush(
278 self: std::pin::Pin<&mut Self>,
279 cx: &mut std::task::Context,
280 ) -> Poll<std::io::Result<()>> {
281 Pin::new(&mut self.get_mut().inner).poll_flush(cx)
282 }
283
284 fn poll_shutdown(
285 self: std::pin::Pin<&mut Self>,
286 cx: &mut std::task::Context,
287 ) -> Poll<std::io::Result<()>> {
288 Pin::new(&mut self.get_mut().inner).poll_shutdown(cx)
289 }
290}
291
292pub fn instrument(
293 stream: Stream,
294 own_peer_id: &PeerId,
295 peer_id: &PeerId,
296 role: &str,
297 metrics: &Metrics,
298 connections_map: &ConnectionsMap,
299) -> InstrumentedStream {
300 let protocol_stack = connections_map.get(peer_id);
301
302 let peer_bandwidth_inbound = metrics
303 .peer_bandwidth
304 .get_or_create(&Labels {
305 own_peer_id: own_peer_id.to_string(),
306 peer_id: peer_id.to_string(),
307 direction: Direction::Inbound,
308 peer_role: role.to_owned(),
309 protocol_stack: protocol_stack.clone(),
310 })
311 .clone();
312
313 let peer_bandwidth_outbound = metrics
314 .peer_bandwidth
315 .get_or_create(&Labels {
316 own_peer_id: own_peer_id.to_string(),
317 peer_id: peer_id.to_string(),
318 direction: Direction::Outbound,
319 peer_role: role.to_owned(),
320 protocol_stack: protocol_stack,
321 })
322 .clone();
323
324 InstrumentedStream {
325 peer_bandwidth_inbound,
326 peer_bandwidth_outbound,
327 inner: stream.compat(),
328 }
329}