1use super::behaviour::SnowstormBehaviour;
2use crate::{
3 config::Config,
4 net::{get_default_interface, netif::DebouncedWatcher},
5};
6use futures::StreamExt;
7use lazy_static::lazy_static;
8use libp2p::{core::muxing::StreamMuxerBox, multiaddr::Protocol, Multiaddr, Swarm, Transport};
9use libp2p_webrtc as webrtc;
10use log::{debug, info, warn};
11use once_cell::sync::Lazy;
12use parking_lot::RwLock;
13use prometheus_client::registry::Registry;
14use socket2::Socket;
15#[cfg(not(target_os = "windows"))]
16use std::os::fd::{AsFd, AsRawFd};
17use std::{
18 io,
19 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
20 sync::{Arc, Mutex},
21 time::Duration,
22};
23use tokio_util::sync::CancellationToken;
24use transport::signalling::ControlHandle;
25use webrtc::tokio::Certificate;
26
27pub mod auth;
28mod transport;
29
30static OUTBOUND_ADDRS: Lazy<Arc<RwLock<Vec<IpAddr>>>> = Lazy::new(|| Arc::new(RwLock::new(vec![])));
31
32lazy_static! {
33 static ref CONFIG: Mutex<Config> = Mutex::new(Default::default());
34}
35
36pub(super) fn build_swarm(
37 token: CancellationToken,
38 config: Config,
39 registry: &mut std::sync::MutexGuard<'_, Registry>,
40 num_streams: Option<usize>,
41) -> (Swarm<SnowstormBehaviour>, Arc<ControlHandle>) {
42 {
43 let mut config_guard = CONFIG.try_lock().unwrap();
44 *config_guard = config.clone();
45 }
46
47 if !config.outbound_addrs.is_empty() {
48 {
49 let mut addrs = OUTBOUND_ADDRS.write();
50
51 *addrs = config
52 .outbound_addrs
53 .iter()
54 .map(multiaddr_to_ipaddr)
55 .filter_map(Result::ok)
56 .collect();
57 }
58
59 if let Ok(mut set) = DebouncedWatcher::new(Duration::from_millis(500)) {
61 tokio::spawn(async move {
62 loop {
63 tokio::select! {
64 _ = token.cancelled() => {
65 return;
66 }
67 new_outbound_addrs = set.select_next_some() => {
68 info!(
69 "Got IF event: Outbound IPs changed {:?}",
70 new_outbound_addrs
71 );
72 let mut addrs = OUTBOUND_ADDRS.write();
73 *addrs = new_outbound_addrs;
74
75 info!("Updated outbound addresses updated");
76 }
77 }
78 }
79 });
80 } else {
81 warn!("Failed to create debounced IF watcher, using static addresses");
82 }
83 }
84
85 let yamux_config = libp2p::yamux::Config::default();
86 let mut quic_config = libp2p::quic::Config::new(&config.keypair.clone());
87 let mut tcp_config = libp2p::tcp::Config::default();
88
89 match config.outbound_addrs.len() {
90 0 => {
91 warn!("no outbound addresses configured, forced binding disabled");
92 }
93 _ => {
94 tcp_config = tcp_config.pre_connect_cb(tcp_outgoing_cb);
95
96 quic_config.socket_created_cb = Some(udp_protect_outgoing);
97 quic_config.eligible_listener_cb = Some(quic_eligible_listener_cb);
98 quic_config.outgoing_dial_addr_cb = Some(quic_outgoing_dial_addr_cb);
99 }
100 };
101
102 if let Some(num_streams) = num_streams {
103 quic_config.max_concurrent_stream_limit = num_streams as u32;
108 info!("increased max concurrent streams: {}", num_streams);
109 }
110
111 let tcp_transport = libp2p::tcp::tokio::Transport::new(tcp_config.clone())
112 .upgrade(libp2p::core::upgrade::Version::V1Lazy)
113 .authenticate(libp2p::noise::Config::new(&config.keypair).unwrap())
114 .multiplex(yamux_config.clone())
115 .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
116
117 let quic_transport = libp2p::quic::tokio::Transport::new(quic_config.clone())
118 .map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer)));
119
120 let webrtc_transport = webrtc::tokio::Transport::new(
121 config.keypair.clone(),
122 Certificate::generate(&mut rand::rng()).unwrap(),
123 Some(webrtc_bind_addr_cb),
124 Some(udp_protect_outgoing),
125 )
126 .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn)))
127 .boxed();
128
129 let transport_chain = tcp_transport
130 .or_transport(quic_transport)
131 .map(|either, _| either.into_inner())
132 .or_transport(webrtc_transport)
133 .map(
134 |either: futures::future::Either<
135 (libp2p::PeerId, StreamMuxerBox),
136 (libp2p::PeerId, StreamMuxerBox),
137 >,
138 _| either.into_inner(),
139 );
140
141 let dns_transport = libp2p::dns::tokio::Transport::custom(
142 transport_chain,
143 Default::default(),
144 Default::default(),
145 );
146
147 let control_handle = Arc::new(ControlHandle::new());
148 let snowflake_transport = self::transport::SnowflakeTransport::new(
149 control_handle.clone(),
150 Some(webrtc_bind_addr_cb),
151 Some(udp_protect_outgoing),
152 )
153 .upgrade(libp2p::core::upgrade::Version::V1Lazy)
154 .authenticate(libp2p::noise::Config::new(&config.keypair).unwrap())
155 .multiplex(yamux_config.clone())
156 .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
157
158 let outer_transport = snowflake_transport
160 .or_transport(dns_transport)
161 .map(|either, _| either.into_inner());
162
163 let measured_transport = libp2p::metrics::BandwidthTransport::new(outer_transport, registry)
164 .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn)));
165
166 let final_transport = measured_transport.boxed();
167
168 let behaviour = SnowstormBehaviour::from(&config);
169 let local_peer_id = config.keypair.public().to_peer_id();
170
171 let swarm_config = libp2p::swarm::Config::with_tokio_executor()
172 .with_idle_connection_timeout(Duration::from_secs(u64::MAX))
173 .with_per_connection_event_buffer_size(128);
174
175 let swarm = libp2p::swarm::Swarm::new(final_transport, behaviour, local_peer_id, swarm_config);
176
177 (swarm, control_handle)
178}
179
180fn tcp_protect_outgoing(socket: &Socket) -> io::Result<()> {
181 debug!("tcp_protect_outgoing: called");
182 let config = CONFIG.lock().unwrap();
183
184 let protect_fn = config.protect_socket_fn.clone();
185
186 #[cfg(not(target_os = "windows"))]
187 if let Some(protect_fn) = protect_fn {
188 let fd = socket.as_fd().as_raw_fd();
189 protect_fn.call(fd);
190 }
191
192 Ok(())
193}
194
195fn udp_protect_outgoing(socket: &UdpSocket) -> io::Result<()> {
196 debug!("udp_protect_outgoing: called");
197 let config = CONFIG.lock().unwrap();
198
199 let protect_fn = config.protect_socket_fn.clone();
200
201 #[cfg(target_os = "macos")]
202 {
203 let s2 = socket2::SockRef::from(socket);
206 let iface = get_default_interface();
207
208 debug!(
209 "udp_protect_outgoing: bind {}",
210 iface
211 .as_ref()
212 .map(|iface| iface.name.clone())
213 .unwrap_or("none".to_string())
214 );
215 let if_index = iface.map(|iface| iface.index.try_into().unwrap());
216
217 if let Ok(local_addr) = socket.local_addr() {
218 if local_addr.is_ipv4() {
219 s2.bind_device_by_index_v4(if_index)?;
220 } else {
221 s2.bind_device_by_index_v6(if_index)?;
222 }
223 }
224 }
225
226 #[cfg(not(target_os = "windows"))]
227 if let Some(protect_fn) = protect_fn {
228 let fd = socket.as_raw_fd();
229 protect_fn.call(fd);
230 }
231
232 Ok(())
233}
234
235fn tcp_outgoing_cb(socket: &Socket, dst_addr: &SocketAddr) -> io::Result<()> {
236 let _ = tcp_protect_outgoing(socket);
238
239 let addrs = OUTBOUND_ADDRS.read().clone();
240
241 if addrs.is_empty() {
242 return Ok(());
243 }
244
245 debug!("tcp_outgoing_cb: {:?}", addrs);
246
247 #[cfg(target_os = "macos")]
254 {
255 let iface = get_default_interface();
258 debug!(
259 "tcp_outgoing_cb: bind {}",
260 iface
261 .as_ref()
262 .map(|iface| iface.name.clone())
263 .unwrap_or("none".to_string())
264 );
265 let if_index = iface.map(|iface| iface.index.try_into().unwrap());
266
267 if dst_addr.is_ipv4() {
268 socket.bind_device_by_index_v4(if_index)
269 } else {
270 socket.bind_device_by_index_v6(if_index)
271 }
272 }
273
274 #[cfg(not(target_os = "macos"))]
275 {
276 let bind_addr = outbound_addr_for_dst(addrs, dst_addr)?;
277 debug!("tcp_outgoing_cb: bind {}", bind_addr);
278
279 socket.bind(&bind_addr.into())
280 }
281}
282
283fn quic_eligible_listener_cb(ln_addr: &SocketAddr, dst_addr: &SocketAddr) -> bool {
284 let addrs = OUTBOUND_ADDRS.read().clone();
285
286 if addrs.is_empty() {
287 return true;
288 }
289
290 debug!("quic_eligible_Listener_cb: {} {}", ln_addr, dst_addr);
291
292 match outbound_addr_for_dst(addrs, dst_addr) {
293 Ok(addr) => {
294 let ok = ln_addr.ip() == addr.ip();
295 debug!(
296 "quic_eligible_Listener_cb: got addr: {} ?= {} ({})",
297 ln_addr, dst_addr, ok
298 );
299
300 return ok;
301 }
302 Err(_) => false,
303 }
304}
305
306fn quic_outgoing_dial_addr_cb(
307 ln_addr: &SocketAddr,
308 dst_addr: &SocketAddr,
309) -> io::Result<SocketAddr> {
310 let addrs = OUTBOUND_ADDRS.read().clone();
311
312 debug!("quic_outgoing_dial_addr_cb: called");
313
314 match addrs.is_empty() {
315 true => Ok(ln_addr.clone()),
316 false => outbound_addr_for_dst(addrs, dst_addr),
317 }
318}
319
320fn webrtc_bind_addr_cb(candidate: &SocketAddr) -> io::Result<SocketAddr> {
321 let addrs = OUTBOUND_ADDRS.read().clone();
322
323 debug!("webrtc_outgoing_dial_addr_cb: called");
324
325 match addrs.is_empty() {
326 true => Ok(candidate.clone()),
327 false => outbound_addr_for_dst(addrs, candidate),
328 }
329}
330
331fn outbound_addr_for_dst(addrs: Vec<IpAddr>, dst_addr: &SocketAddr) -> io::Result<SocketAddr> {
332 let dst_is_ipv4 = dst_addr.is_ipv4();
341
342 for ip in addrs.into_iter() {
343 if ip.is_ipv4() == dst_is_ipv4 {
345 debug!("outbound_addr_for_dst: ({}), {}", dst_addr, ip);
346 return Ok(SocketAddr::new(ip, 0));
348 }
349 if ip.is_ipv6() == !dst_is_ipv4 {
350 debug!("outbound_addr_for_dst: ({}), {}", dst_addr, ip);
351 return Ok(SocketAddr::new(ip, 0));
353 }
354 }
355
356 warn!("failed to find outbound address for dest: {}", dst_addr);
357
358 Err(io::Error::new(
359 io::ErrorKind::Other,
360 "failed to find eligible outbound_addr",
361 ))
362}
363
364fn multiaddr_to_ipaddr(addr: &Multiaddr) -> io::Result<IpAddr> {
365 let mut ma = addr.clone();
366
367 while let Some(proto) = ma.pop() {
368 match proto {
369 Protocol::Ip4(ip) => {
370 return Ok(IpAddr::V4(ip.into()));
371 }
372 Protocol::Ip6(ip) => {
373 return Ok(IpAddr::V6(ip.into()));
374 }
375 _ => {}
376 }
377 }
378
379 warn!("failed to parse Multiaddr as IP: {}", addr);
380
381 Err(io::Error::new(
382 io::ErrorKind::Other,
383 "failed to parse Multiaddr as IpAddr",
384 ))
385}
386
387pub fn multiaddr_is_public(addr: &Multiaddr) -> bool {
388 let ip = match multiaddr_to_ipaddr(addr) {
389 Ok(ip) => ip,
390 Err(_) => return false,
391 };
392
393 match ip {
394 IpAddr::V4(ip) => ipv4_is_global(&ip),
395 IpAddr::V6(ip) => ipv6_is_global(&ip),
396 }
397}
398
399pub fn multiaddr_is_loopback(addr: &Multiaddr) -> bool {
400 let ip = match multiaddr_to_ipaddr(addr) {
401 Ok(ip) => ip,
402 Err(_) => return false,
403 };
404
405 match ip {
406 IpAddr::V4(ip) => ip.is_loopback(),
407 IpAddr::V6(ip) => ip.is_loopback(),
408 }
409}
410
411#[inline]
412pub fn ipv4_is_global(ip: &Ipv4Addr) -> bool {
413 !(ip.octets()[0] == 0 || ip.is_private()
415 || (ip.octets()[0] == 100 && (ip.octets()[1] & 0b1100_0000 == 0b0100_0000)) || ip.is_loopback()
417 || ip.is_link_local()
418 || (
421 ip.octets()[0] == 192 && ip.octets()[1] == 0 && ip.octets()[2] == 0
422 && ip.octets()[3] != 9 && ip.octets()[3] != 10
423 )
424 || ip.is_documentation()
425 || (ip.octets()[0] == 198 && (ip.octets()[1] & 0xfe) == 18) || (ip.octets()[0] & 240 == 240 && !ip.is_broadcast()) || ip.is_broadcast())
428}
429
430#[inline]
431pub fn ipv6_is_global(ip: &Ipv6Addr) -> bool {
432 !(ip.is_unspecified()
433 || ip.is_loopback()
434 || matches!(ip.segments(), [0, 0, 0, 0, 0, 0xffff, _, _])
436 || matches!(ip.segments(), [0x64, 0xff9b, 1, _, _, _, _, _])
438 || matches!(ip.segments(), [0x100, 0, 0, 0, _, _, _, _])
440 || (matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if b < 0x200)
442 && !(
443 u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0001
445 || u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0002
447 || matches!(ip.segments(), [0x2001, 3, _, _, _, _, _, _])
449 || matches!(ip.segments(), [0x2001, 4, 0x112, _, _, _, _, _])
451 || matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if b >= 0x20 && b <= 0x3F)
454 ))
455 || matches!(ip.segments(), [0x2002, _, _, _, _, _, _, _])
458 || ((ip.segments()[0] == 0x2001) && (ip.segments()[1] == 0xdb8)) || ((ip.segments()[0] & 0xfe00) == 0xfc00) || ((ip.segments()[0] & 0xffc0) == 0xfe80)) }