snowstorm/net/
swarm.rs

1use super::behaviour::SnowstormBehaviour;
2use crate::{
3    config::Config,
4    net::{get_default_interface, netif::DebouncedWatcher},
5};
6use futures::StreamExt;
7use lazy_static::lazy_static;
8use libp2p::{core::muxing::StreamMuxerBox, multiaddr::Protocol, Multiaddr, Swarm, Transport};
9use libp2p_webrtc as webrtc;
10use log::{debug, info, warn};
11use once_cell::sync::Lazy;
12use parking_lot::RwLock;
13use prometheus_client::registry::Registry;
14use socket2::Socket;
15#[cfg(not(target_os = "windows"))]
16use std::os::fd::{AsFd, AsRawFd};
17use std::{
18    io,
19    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
20    sync::{Arc, Mutex},
21    time::Duration,
22};
23use tokio_util::sync::CancellationToken;
24use transport::signalling::ControlHandle;
25use webrtc::tokio::Certificate;
26
27pub mod auth;
28mod transport;
29
30static OUTBOUND_ADDRS: Lazy<Arc<RwLock<Vec<IpAddr>>>> = Lazy::new(|| Arc::new(RwLock::new(vec![])));
31
32lazy_static! {
33    static ref CONFIG: Mutex<Config> = Mutex::new(Default::default());
34}
35
36pub(super) fn build_swarm(
37    token: CancellationToken,
38    config: Config,
39    registry: &mut std::sync::MutexGuard<'_, Registry>,
40    num_streams: Option<usize>,
41) -> (Swarm<SnowstormBehaviour>, Arc<ControlHandle>) {
42    {
43        let mut config_guard = CONFIG.try_lock().unwrap();
44        *config_guard = config.clone();
45    }
46
47    if !config.outbound_addrs.is_empty() {
48        {
49            let mut addrs = OUTBOUND_ADDRS.write();
50
51            *addrs = config
52                .outbound_addrs
53                .iter()
54                .map(multiaddr_to_ipaddr)
55                .filter_map(Result::ok)
56                .collect();
57        }
58
59        // Debounced IF watcher to make sure we have up to date addrs when network +- stabilizes
60        if let Ok(mut set) = DebouncedWatcher::new(Duration::from_millis(500)) {
61            tokio::spawn(async move {
62                loop {
63                    tokio::select! {
64                        _ = token.cancelled() => {
65                            return;
66                        }
67                        new_outbound_addrs = set.select_next_some() => {
68                            info!(
69                                "Got IF event: Outbound IPs changed {:?}",
70                                new_outbound_addrs
71                            );
72                            let mut addrs = OUTBOUND_ADDRS.write();
73                            *addrs = new_outbound_addrs;
74
75                            info!("Updated outbound addresses updated");
76                        }
77                    }
78                }
79            });
80        } else {
81            warn!("Failed to create debounced IF watcher, using static addresses");
82        }
83    }
84
85    let yamux_config = libp2p::yamux::Config::default();
86    let mut quic_config = libp2p::quic::Config::new(&config.keypair.clone());
87    let mut tcp_config = libp2p::tcp::Config::default();
88
89    match config.outbound_addrs.len() {
90        0 => {
91            warn!("no outbound addresses configured, forced binding disabled");
92        }
93        _ => {
94            tcp_config = tcp_config.pre_connect_cb(tcp_outgoing_cb);
95
96            quic_config.socket_created_cb = Some(udp_protect_outgoing);
97            quic_config.eligible_listener_cb = Some(quic_eligible_listener_cb);
98            quic_config.outgoing_dial_addr_cb = Some(quic_outgoing_dial_addr_cb);
99        }
100    };
101
102    if let Some(num_streams) = num_streams {
103        // Setting this makes the mux upset on fly.io for some reason,
104        // so just use the default config for now.
105        //
106        // yamux_config.set_max_num_streams(num_streams);
107        quic_config.max_concurrent_stream_limit = num_streams as u32;
108        info!("increased max concurrent streams: {}", num_streams);
109    }
110
111    let tcp_transport = libp2p::tcp::tokio::Transport::new(tcp_config.clone())
112        .upgrade(libp2p::core::upgrade::Version::V1Lazy)
113        .authenticate(libp2p::noise::Config::new(&config.keypair).unwrap())
114        .multiplex(yamux_config.clone())
115        .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
116
117    let quic_transport = libp2p::quic::tokio::Transport::new(quic_config.clone())
118        .map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer)));
119
120    let webrtc_transport = webrtc::tokio::Transport::new(
121        config.keypair.clone(),
122        Certificate::generate(&mut rand::rng()).unwrap(),
123        Some(webrtc_bind_addr_cb),
124        Some(udp_protect_outgoing),
125    )
126    .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn)))
127    .boxed();
128
129    let transport_chain = tcp_transport
130        .or_transport(quic_transport)
131        .map(|either, _| either.into_inner())
132        .or_transport(webrtc_transport)
133        .map(
134            |either: futures::future::Either<
135                (libp2p::PeerId, StreamMuxerBox),
136                (libp2p::PeerId, StreamMuxerBox),
137            >,
138             _| either.into_inner(),
139        );
140
141    let dns_transport = libp2p::dns::tokio::Transport::custom(
142        transport_chain,
143        Default::default(),
144        Default::default(),
145    );
146
147    let control_handle = Arc::new(ControlHandle::new());
148    let snowflake_transport = self::transport::SnowflakeTransport::new(
149        control_handle.clone(),
150        Some(webrtc_bind_addr_cb),
151        Some(udp_protect_outgoing),
152    )
153    .upgrade(libp2p::core::upgrade::Version::V1Lazy)
154    .authenticate(libp2p::noise::Config::new(&config.keypair).unwrap())
155    .multiplex(yamux_config.clone())
156    .map(|(p, c), _| (p, StreamMuxerBox::new(c)));
157
158    // Try snowflakes first, since we need to intercept /dns4/snowflake
159    let outer_transport = snowflake_transport
160        .or_transport(dns_transport)
161        .map(|either, _| either.into_inner());
162
163    let measured_transport = libp2p::metrics::BandwidthTransport::new(outer_transport, registry)
164        .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn)));
165
166    let final_transport = measured_transport.boxed();
167
168    let behaviour = SnowstormBehaviour::from(&config);
169    let local_peer_id = config.keypair.public().to_peer_id();
170
171    let swarm_config = libp2p::swarm::Config::with_tokio_executor()
172        .with_idle_connection_timeout(Duration::from_secs(u64::MAX))
173        .with_per_connection_event_buffer_size(128);
174
175    let swarm = libp2p::swarm::Swarm::new(final_transport, behaviour, local_peer_id, swarm_config);
176
177    (swarm, control_handle)
178}
179
180fn tcp_protect_outgoing(socket: &Socket) -> io::Result<()> {
181    debug!("tcp_protect_outgoing: called");
182    let config = CONFIG.lock().unwrap();
183
184    let protect_fn = config.protect_socket_fn.clone();
185
186    #[cfg(not(target_os = "windows"))]
187    if let Some(protect_fn) = protect_fn {
188        let fd = socket.as_fd().as_raw_fd();
189        protect_fn.call(fd);
190    }
191
192    Ok(())
193}
194
195fn udp_protect_outgoing(socket: &UdpSocket) -> io::Result<()> {
196    debug!("udp_protect_outgoing: called");
197    let config = CONFIG.lock().unwrap();
198
199    let protect_fn = config.protect_socket_fn.clone();
200
201    #[cfg(target_os = "macos")]
202    {
203        // Something changed in the new macos, so instead of binding to a specific interface via
204        // its address we bind via its index. Might be related to tethering/personal hotspot only.
205        let s2 = socket2::SockRef::from(socket);
206        let iface = get_default_interface();
207
208        debug!(
209            "udp_protect_outgoing: bind {}",
210            iface
211                .as_ref()
212                .map(|iface| iface.name.clone())
213                .unwrap_or("none".to_string())
214        );
215        let if_index = iface.map(|iface| iface.index.try_into().unwrap());
216
217        if let Ok(local_addr) = socket.local_addr() {
218            if local_addr.is_ipv4() {
219                s2.bind_device_by_index_v4(if_index)?;
220            } else {
221                s2.bind_device_by_index_v6(if_index)?;
222            }
223        }
224    }
225
226    #[cfg(not(target_os = "windows"))]
227    if let Some(protect_fn) = protect_fn {
228        let fd = socket.as_raw_fd();
229        protect_fn.call(fd);
230    }
231
232    Ok(())
233}
234
235fn tcp_outgoing_cb(socket: &Socket, dst_addr: &SocketAddr) -> io::Result<()> {
236    // Protect the socket (Android)
237    let _ = tcp_protect_outgoing(socket);
238
239    let addrs = OUTBOUND_ADDRS.read().clone();
240
241    if addrs.is_empty() {
242        return Ok(());
243    }
244
245    debug!("tcp_outgoing_cb: {:?}", addrs);
246
247    // Bind the socket to the appropriate outbound interface.
248    //
249    // WARNING: If we ever decide to reuse ports, this might need
250    // to be more involved (though the reuse-port code calls bind,
251    // so perhaps not?).
252
253    #[cfg(target_os = "macos")]
254    {
255        // Something changed in the new macos, so instead of binding to a specific interface via
256        // its address we bind via its index. Might be related to tethering/personal hotspot only.
257        let iface = get_default_interface();
258        debug!(
259            "tcp_outgoing_cb: bind {}",
260            iface
261                .as_ref()
262                .map(|iface| iface.name.clone())
263                .unwrap_or("none".to_string())
264        );
265        let if_index = iface.map(|iface| iface.index.try_into().unwrap());
266
267        if dst_addr.is_ipv4() {
268            socket.bind_device_by_index_v4(if_index)
269        } else {
270            socket.bind_device_by_index_v6(if_index)
271        }
272    }
273
274    #[cfg(not(target_os = "macos"))]
275    {
276        let bind_addr = outbound_addr_for_dst(addrs, dst_addr)?;
277        debug!("tcp_outgoing_cb: bind {}", bind_addr);
278
279        socket.bind(&bind_addr.into())
280    }
281}
282
283fn quic_eligible_listener_cb(ln_addr: &SocketAddr, dst_addr: &SocketAddr) -> bool {
284    let addrs = OUTBOUND_ADDRS.read().clone();
285
286    if addrs.is_empty() {
287        return true;
288    }
289
290    debug!("quic_eligible_Listener_cb: {} {}", ln_addr, dst_addr);
291
292    match outbound_addr_for_dst(addrs, dst_addr) {
293        Ok(addr) => {
294            let ok = ln_addr.ip() == addr.ip();
295            debug!(
296                "quic_eligible_Listener_cb: got addr: {} ?= {} ({})",
297                ln_addr, dst_addr, ok
298            );
299
300            return ok;
301        }
302        Err(_) => false,
303    }
304}
305
306fn quic_outgoing_dial_addr_cb(
307    ln_addr: &SocketAddr,
308    dst_addr: &SocketAddr,
309) -> io::Result<SocketAddr> {
310    let addrs = OUTBOUND_ADDRS.read().clone();
311
312    debug!("quic_outgoing_dial_addr_cb: called");
313
314    match addrs.is_empty() {
315        true => Ok(ln_addr.clone()),
316        false => outbound_addr_for_dst(addrs, dst_addr),
317    }
318}
319
320fn webrtc_bind_addr_cb(candidate: &SocketAddr) -> io::Result<SocketAddr> {
321    let addrs = OUTBOUND_ADDRS.read().clone();
322
323    debug!("webrtc_outgoing_dial_addr_cb: called");
324
325    match addrs.is_empty() {
326        true => Ok(candidate.clone()),
327        false => outbound_addr_for_dst(addrs, candidate),
328    }
329}
330
331fn outbound_addr_for_dst(addrs: Vec<IpAddr>, dst_addr: &SocketAddr) -> io::Result<SocketAddr> {
332    // Note: This is used by both QUIC and TCP/IP.  The libp2p QUIC
333    // transport only supports having one non-listener dialer per AF,
334    // which is unfortunate, so this routine consistently returns the
335    // first external address for now.
336    //
337    // As far as I can tell the transport basically will totally freak
338    // out if the external address changes mid-way as well.
339
340    let dst_is_ipv4 = dst_addr.is_ipv4();
341
342    for ip in addrs.into_iter() {
343        // Pick the first interface that has the same AF.
344        if ip.is_ipv4() == dst_is_ipv4 {
345            debug!("outbound_addr_for_dst: ({}), {}", dst_addr, ip);
346            // Have the OS pick a port for us.
347            return Ok(SocketAddr::new(ip, 0));
348        }
349        if ip.is_ipv6() == !dst_is_ipv4 {
350            debug!("outbound_addr_for_dst: ({}), {}", dst_addr, ip);
351            // Have the OS pick a port for us.
352            return Ok(SocketAddr::new(ip, 0));
353        }
354    }
355
356    warn!("failed to find outbound address for dest: {}", dst_addr);
357
358    Err(io::Error::new(
359        io::ErrorKind::Other,
360        "failed to find eligible outbound_addr",
361    ))
362}
363
364fn multiaddr_to_ipaddr(addr: &Multiaddr) -> io::Result<IpAddr> {
365    let mut ma = addr.clone();
366
367    while let Some(proto) = ma.pop() {
368        match proto {
369            Protocol::Ip4(ip) => {
370                return Ok(IpAddr::V4(ip.into()));
371            }
372            Protocol::Ip6(ip) => {
373                return Ok(IpAddr::V6(ip.into()));
374            }
375            _ => {}
376        }
377    }
378
379    warn!("failed to parse Multiaddr as IP: {}", addr);
380
381    Err(io::Error::new(
382        io::ErrorKind::Other,
383        "failed to parse Multiaddr as IpAddr",
384    ))
385}
386
387pub fn multiaddr_is_public(addr: &Multiaddr) -> bool {
388    let ip = match multiaddr_to_ipaddr(addr) {
389        Ok(ip) => ip,
390        Err(_) => return false,
391    };
392
393    match ip {
394        IpAddr::V4(ip) => ipv4_is_global(&ip),
395        IpAddr::V6(ip) => ipv6_is_global(&ip),
396    }
397}
398
399pub fn multiaddr_is_loopback(addr: &Multiaddr) -> bool {
400    let ip = match multiaddr_to_ipaddr(addr) {
401        Ok(ip) => ip,
402        Err(_) => return false,
403    };
404
405    match ip {
406        IpAddr::V4(ip) => ip.is_loopback(),
407        IpAddr::V6(ip) => ip.is_loopback(),
408    }
409}
410
411#[inline]
412pub fn ipv4_is_global(ip: &Ipv4Addr) -> bool {
413    !(ip.octets()[0] == 0 // "This network"
414            || ip.is_private()
415            || (ip.octets()[0] == 100 && (ip.octets()[1] & 0b1100_0000 == 0b0100_0000)) //is_shared
416            || ip.is_loopback()
417            || ip.is_link_local()
418            // addresses reserved for future protocols (`192.0.0.0/24`)
419            // .9 and .10 are documented as globally reachable so they're excluded
420            || (
421                ip.octets()[0] == 192 && ip.octets()[1] == 0 && ip.octets()[2] == 0
422                && ip.octets()[3] != 9 && ip.octets()[3] != 10
423            )
424            || ip.is_documentation()
425            || (ip.octets()[0] == 198 && (ip.octets()[1] & 0xfe) == 18) //is_benchmarking
426            || (ip.octets()[0] & 240 == 240 && !ip.is_broadcast()) // is_reserved
427            || ip.is_broadcast())
428}
429
430#[inline]
431pub fn ipv6_is_global(ip: &Ipv6Addr) -> bool {
432    !(ip.is_unspecified()
433            || ip.is_loopback()
434            // IPv4-mapped Address (`::ffff:0:0/96`)
435            || matches!(ip.segments(), [0, 0, 0, 0, 0, 0xffff, _, _])
436            // IPv4-IPv6 Translat. (`64:ff9b:1::/48`)
437            || matches!(ip.segments(), [0x64, 0xff9b, 1, _, _, _, _, _])
438            // Discard-Only Address Block (`100::/64`)
439            || matches!(ip.segments(), [0x100, 0, 0, 0, _, _, _, _])
440            // IETF Protocol Assignments (`2001::/23`)
441            || (matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if b < 0x200)
442                && !(
443                    // Port Control Protocol Anycast (`2001:1::1`)
444                    u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0001
445                    // Traversal Using Relays around NAT Anycast (`2001:1::2`)
446                    || u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0002
447                    // AMT (`2001:3::/32`)
448                    || matches!(ip.segments(), [0x2001, 3, _, _, _, _, _, _])
449                    // AS112-v6 (`2001:4:112::/48`)
450                    || matches!(ip.segments(), [0x2001, 4, 0x112, _, _, _, _, _])
451                    // ORCHIDv2 (`2001:20::/28`)
452                    // Drone Remote ID Protocol Entity Tags (DETs) Prefix (`2001:30::/28`)`
453                    || matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if b >= 0x20 && b <= 0x3F)
454                ))
455            // 6to4 (`2002::/16`) – it's not explicitly documented as globally reachable,
456            // IANA says N/A.
457            || matches!(ip.segments(), [0x2002, _, _, _, _, _, _, _])
458            || ((ip.segments()[0] == 0x2001) && (ip.segments()[1] == 0xdb8)) //documentation
459            || ((ip.segments()[0] & 0xfe00) == 0xfc00) //is_unique_local
460            || ((ip.segments()[0] & 0xffc0) == 0xfe80)) //is_unicast_link_local
461}