snowstorm/net/
mod.rs

1pub mod behaviour;
2mod discovery;
3
4pub mod connected_exits;
5mod connections;
6mod control_peer;
7pub mod endpoint;
8pub mod event;
9pub mod metrics;
10pub mod netif;
11pub mod protocols;
12mod roles;
13pub mod swarm;
14
15use connected_exits::ConnectedExits;
16use connections::ConnectionsMap;
17use futures::StreamExt;
18use libp2p::metrics::Recorder;
19pub use libp2p::metrics::Registry;
20use libp2p::swarm::dial_opts::DialOpts;
21use std::net::Ipv4Addr;
22use std::sync::Mutex;
23use tokio_util::sync::CancellationToken;
24// use libp2p::core::ConnectedPoint;
25use libp2p::multiaddr::Protocol;
26use serde::Deserialize;
27use std::fmt;
28use std::{net::TcpListener, sync::Arc};
29use tokio::sync::{broadcast, mpsc};
30
31use log::*;
32
33pub use libp2p::swarm::ConnectionError;
34use libp2p::swarm::SwarmEvent;
35pub use libp2p::PeerId;
36
37use self::{event::Event, swarm::build_swarm};
38use crate::net::control_peer::ControlPeerEvent;
39use crate::net::discovery::role_to_namespace;
40use crate::{
41    config::{Config, Multiaddr},
42    control,
43    identity::Role,
44    net::{
45        behaviour::SnowstormBehaviourEvent,
46        discovery::{register, search_role},
47    },
48};
49
50pub type Multihash = multihash::Multihash<64>;
51pub const MULTIHASH_IDENTITY_CODE: u64 = 0;
52
53const DEFAULT_TUN_IPV4: Ipv4Addr = Ipv4Addr::new(192, 18, 0, 1);
54
55pub fn get_tun_interface() -> Option<netdev::Interface> {
56    netdev::get_interfaces()
57        .iter()
58        .find(|iface| iface.ipv4.iter().any(|net| net.addr() == DEFAULT_TUN_IPV4))
59        .cloned()
60}
61
62pub fn get_default_interface() -> Option<netdev::Interface> {
63    let mut interfaces = netdev::get_interfaces()
64        .into_iter()
65        .filter(|iface| {
66            // filtering out by name and not .is_tun(), some cell interfaces are point to point.
67            let is_tun = iface.name.contains("tun");
68            let is_snowstorm = iface.ipv4.iter().any(|net| net.addr() == DEFAULT_TUN_IPV4);
69            let is_up = iface.is_up();
70            
71            // On iOS there's often a bunch of interfaces, some of en* are not meant for internet traffic
72            // so for the time being ignoring them manually...
73            #[cfg(target_os = "ios")]
74            if iface.if_type == netdev::interface::InterfaceType::Ethernet {
75                if !(iface.name == "en0" || iface.name == "en1") {
76                    return false;
77                }
78            }
79
80            let has_ipv4 = iface.ipv4.iter().any(|net| !net.addr().is_loopback());
81            let has_ipv6 = iface
82                .ipv6
83                .iter()
84                .any(|net| !net.addr().is_unicast_link_local());
85
86            return is_up && !iface.is_loopback() && !is_snowstorm && !is_tun && (has_ipv4 || has_ipv6);
87        })
88        .collect::<Vec<_>>();
89
90    interfaces.sort_by_key(|iface| {
91        // On windows we don't have gateway feature enabled, so not getting info about default iface.
92        #[cfg(not(target_os = "windows"))]
93        {
94            if iface.default {
95                return 0;
96            }
97        }
98        
99        // Friendly name is useful on windows, since norlam name is a GUID.
100        let names = [
101            iface.name.to_lowercase(),
102            iface.friendly_name.as_ref().map(|s| s.to_lowercase()).unwrap_or_default(),
103        ];
104
105        let is_en = iface.if_type == netdev::interface::InterfaceType::Ethernet;
106        let is_wifi = iface.if_type == netdev::interface::InterfaceType::Wireless80211;
107        let has_wifi_name = names.iter().any(|name| name.contains("wlan") || name.contains("wlp") || name.contains("wi-fi"));
108        if is_en || is_wifi || has_wifi_name {
109            return 1;
110        }
111
112        return 2;
113    });
114
115    let names = interfaces.iter().map(|iface| iface.name.clone()).collect::<Vec<_>>();
116    let friendly_names = interfaces
117        .iter()
118        .filter_map(|iface| iface.friendly_name.as_ref())
119        .map(|s| s.to_string())
120        .collect::<Vec<_>>();
121    log::info!("Available interfaces: {:?}", names);
122    log::info!("Available friendly interfaces: {:?}", friendly_names);
123
124    return interfaces.first().cloned();
125}
126
127pub fn get_outbound_ipv4() -> Option<Ipv4Addr> {
128    let default_interface = get_default_interface();
129
130    default_interface
131        .and_then(|iface| iface.ipv4.first().cloned())
132        .map(|net| net.addr())
133}
134
135pub fn get_outbound_ips() -> Vec<Multiaddr> {
136    let default_interface = get_default_interface();
137
138    info!("default interface: {:?}", default_interface);
139
140    match default_interface {
141        Some(iface) => {
142            let mut addrs = Vec::<Multiaddr>::new();
143            if let Some(ipv4) = iface.ipv4.first() {
144                addrs.push(Multiaddr::from(ipv4.addr()));
145            }
146            if let Some(ipv6) = iface
147                .ipv6
148                .into_iter()
149                .find(|net| !net.addr().is_unicast_link_local())
150            {
151                addrs.push(Multiaddr::from(ipv6.addr()));
152            }
153            addrs
154        }
155        None => Vec::new(),
156    }
157}
158
159pub fn get_available_local_port() -> Option<u16> {
160    (1024..65535).find(|port| port_is_available(*port))
161}
162
163fn port_is_available(port: u16) -> bool {
164    match TcpListener::bind(("127.0.0.1", port)) {
165        Ok(_) => true,
166        Err(_) => false,
167    }
168}
169
170#[derive(Clone)]
171pub struct ProtectSocketFn(pub Arc<dyn Fn(i32) -> () + Send + Sync>);
172
173impl fmt::Debug for ProtectSocketFn {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        f.write_str("protect_socket_fn")
176    }
177}
178
179impl ProtectSocketFn {
180    pub fn call(&self, fd: i32) {
181        (self.0)(fd);
182    }
183}
184
185pub struct Handle {
186    pub control_tx: mpsc::Sender<Event>,
187    pub swarm_rx: broadcast::Receiver<Event>,
188    pub swarm_tx: broadcast::Sender<Event>,
189    pub sserver_port: Option<u16>,
190    pub connected_exits: Arc<ConnectedExits>,
191}
192
193impl Handle {
194    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
195        self.swarm_tx.subscribe()
196    }
197}
198
199// Main entry point into the SDK start method.
200pub async fn start(
201    config: &Config,
202    registry: Arc<Mutex<Registry>>,
203) -> Result<Handle, Box<dyn std::error::Error>> {
204    let (swarm_tx, swarm_rx) = broadcast::channel::<Event>(8);
205    let (control_tx, mut control_rx) = mpsc::channel::<Event>(8);
206    let cancellation_token = CancellationToken::new();
207    let connections_map = Arc::new(ConnectionsMap::new());
208    let is_client = config.roles.contains(&Role::Client);
209    let is_exit = config.roles.contains(&Role::Exit);
210
211    let _ = tracing_subscriber::fmt()
212        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
213        .try_init();
214
215    let mut max_streams: usize = 512;
216
217    // File descriptor limit should be increased to the extent which makes sense.
218    // Please verify config fdlimit heuristics
219    if config.fdlimit {
220        match fdlimit::raise_fd_limit() {
221            Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
222                info!("raised fd limit from {:?} to {:?}", from, to);
223                // TODO: HACK HACK HACK HACK
224                //
225                // For some reason to was much less than from, and this was causing
226                // a panic for a negative unsigned... so we skipped it for now
227                //
228                // PLEASE VERIFY AND FIX
229                println!("raised fd limit from {:?} to {:?}", from, to);
230                // max_streams = (to - from) as usize;
231                max_streams = to as usize;
232            }
233            _ => warn!("failed to raise fd limit"),
234        }
235    }
236
237    let (mut swarm, snowstorm_metrics, swarm_metrics, control_handle) = {
238        let mut metrics_registry = registry.lock().unwrap();
239
240        // Create and register our customer metric types
241        let metrics = metrics::Metrics::new(&mut metrics_registry);
242
243        // Some built-in libp2p metrics
244        let swarm_metrics = libp2p::metrics::Metrics::new(&mut metrics_registry);
245
246        // Build the swarm, enable & register generic bandwidth metrics
247        let (s, control_handle) = build_swarm(
248            cancellation_token.clone(),
249            config.clone(),
250            &mut metrics_registry,
251            Some(max_streams),
252        );
253
254        (s, metrics, swarm_metrics, control_handle)
255    };
256
257    // Safe to unwrap for the first time. Afterwards - None.
258    let mut control_stream_requests_rx = control_handle.next_stream_request().unwrap();
259
260    let snowstorm_metrics = Arc::new(snowstorm_metrics);
261
262    // Start the listener(s)
263    for addr in config.listen_on.iter() {
264        warn!("attempting to listen on {:?}", addr);
265        let result = swarm.listen_on(addr.clone());
266        if let Err(err) = result {
267            warn!("Failed to listen on multiaddr {:?}: {:?}", addr, err);
268        }
269    }
270
271    for addr in config.external_addrs.iter() {
272        swarm.add_external_address(addr.clone());
273    }
274
275    for addr in config.entry_peers.iter() {
276        let dial_result = swarm.dial(addr.clone());
277
278        match dial_result {
279            Ok(_) => info!("connected to entry peer: {:?}", addr),
280            Err(err) => warn!("failed to dial entry peer: {:?}", err),
281        }
282    }
283
284    let mut cookie: Option<libp2p::rendezvous::Cookie> = None;
285
286    if is_exit {
287        // TODO: Should we do proper error handling here?
288        roles::exit::serve(
289            &mut swarm,
290            &config,
291            snowstorm_metrics.clone(),
292            cancellation_token.clone(),
293            connections_map.clone(),
294        );
295    }
296
297    let mut sserver_port = None;
298    let (connected_exits, mut exit_dials) = ConnectedExits::new(config.token.as_deref().unwrap_or("").to_string(), cancellation_token.clone());
299    let connected_exits = Arc::new(connected_exits);
300    let connected_exits2 = connected_exits.clone();
301
302    // NOTE(2025.03.04): This was the source of an annoying MacOS panic by directly unwrapping,
303    // and has since been updated to not crash, but probably needs much more thinking / reworking.
304    if is_client {
305        // Client serve can sometimes result in an error, but unwrapping it directly
306        // will cause an unnecessary crash. So it must be properly handled...
307        match roles::client::serve(
308            &mut swarm,
309            &config,
310            snowstorm_metrics.clone(),
311            cancellation_token.clone(),
312            connections_map.clone(),
313            connected_exits.clone(),
314        )
315        .await
316        {
317            Ok(port) => {
318                info!("[SDK] client role started on port: {}", port);
319                sserver_port = Some(port);
320            }
321            Err(e) => {
322                info!(
323                    "[SDK] Error while starting client role - but continuing: {:?}",
324                    e
325                );
326                // TODO: Figure out if we should fix something here??
327                // Or update some state somewhere?
328            }
329        }
330    }
331
332    let config = config.clone();
333    let swarm_tx_clone = swarm_tx.clone();
334    let mut stream_control = swarm.behaviour_mut().stream().unwrap().new_control();
335    let mut control_peer = control_peer::ControlPeer::new(stream_control.clone());
336
337    tokio::spawn(async move {
338        let config = config.clone();
339
340        loop {
341            tokio::select! {
342                event = swarm.select_next_some() => {
343                    swarm_metrics.record(&event);
344                    control_peer.on_swarm_event(&event);
345
346                    // We only care about connected exits if we are a client
347                    if is_client {
348                        connected_exits.on_swarm_event(&event);
349                    }
350
351                    match event {
352                        SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
353                            debug!("connection established: {:?}", peer_id);
354                            let _ = swarm_tx_clone.send(Event::ConnectionEstablished(peer_id.clone()));
355                            connections_map.connection_established(peer_id.clone(), &endpoint);
356
357                            if endpoint.is_listener() {
358                                let _ = swarm_tx_clone.send(Event::ListenerConnectionEstablished(peer_id.clone()));
359                            } else if endpoint.is_dialer() {
360                                let mad = endpoint.get_remote_address();
361                                log::info!("dialer connection established: {:?}", mad);
362                                let _ = swarm_tx_clone.send(Event::DialerConnectionEstablished(peer_id.clone(), is_snowflake(mad)));
363                            }
364
365                            if config.has_entry(&peer_id) {
366                                debug!("peer is in entry peer list, sending rendezvous request");
367                                register(&config, swarm.behaviour_mut(), peer_id.clone());
368
369                                _ = endpoint;
370                                // let multiaddr = match endpoint {
371                                //     ConnectedPoint::Dialer { address, .. } => address,
372                                //     ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
373                                // };
374                                // swarm.behaviour_mut().autonat().unwrap().add_server(peer_id, Some(multiaddr));
375                            }
376
377                            search_role(&config, swarm.behaviour_mut(), Role::Volunteer, peer_id, cookie.clone());
378                            search_role(&config, swarm.behaviour_mut(), Role::Exit, peer_id, cookie.clone());
379                        },
380                        SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
381                            debug!("connection closed: {:?}", peer_id);
382                            connections_map.connection_closed(&peer_id);
383                            let _ = swarm_tx_clone.send(Event::ConnectionClosed(peer_id, cause.map(Into::into)));
384                        },
385                        SwarmEvent::NewListenAddr { address, .. } => {
386                            debug!("new listen addr: {:?}", address);
387
388                            if let Err(err) = swarm_tx_clone.send(Event::NewListenAddr(address)) {
389                                error!("Failed to send event: {:?}", err);
390                            }
391
392                            if let Some(token) = &config.token {
393                                if !is_client {
394                                    let device_id = config.device_id.clone();
395                                    let pid = swarm.local_peer_id();
396                                    let mas: Vec<&libp2p::Multiaddr> = swarm.external_addresses().map(|addr| addr).collect();
397                                    let _ = control::update_multiaddrs(token.to_string(), pid.clone().to_string(), device_id, mas, true).await;
398                                }
399                            }
400                        },
401                        SwarmEvent::Behaviour(SnowstormBehaviourEvent::Identify(libp2p::identify::Event::Received { info, .. })) => {
402                            debug!("identify observed addr {:?}", info.observed_addr);
403
404                            if let Some(token) = &config.token {
405                                // This logic is also triggered on our exits, so we only update based on external observations as a fallback
406                                // It should be ok to update addresses like that.
407                                if !is_client {
408                                    let mut mas: Vec<Multiaddr> = swarm.listeners().map(Clone::clone).collect();
409                                    mas.push(info.observed_addr.clone());
410                                    let pid = swarm.local_peer_id();
411                                    let device_id = config.device_id.clone();
412                                    let _ = control::update_multiaddrs(token.to_string(), pid.clone().to_string(), device_id, mas.iter().collect(), false).await;
413                                }
414                            }
415                            let event = Event::NewObservedAddr(info.observed_addr);
416                            let _ = swarm_tx_clone.send(event);
417                        },
418                        SwarmEvent::Behaviour(SnowstormBehaviourEvent::RendezvousClient(libp2p::rendezvous::client::Event::Discovered {
419                            registrations,
420                            cookie: new_cookie,
421                            ..
422                        })) => {
423                            cookie.replace(new_cookie);
424                            debug!("rendezvous discovered: {:?}/{:?}", cookie.clone().unwrap().namespace(), registrations);
425                            for registration in registrations {
426                                info!("discovered rendezvous peer: {:?}", registration.record.peer_id());
427                                let event = match cookie.clone().unwrap().namespace() {
428                                    Some(namespace) => {
429                                        if namespace.to_string() == role_to_namespace(&config, Role::Exit).to_string() {
430                                            Event::ExitDiscovered(registration.record.peer_id().clone())
431                                        } else {
432                                            Event::Discovered(registration.record.peer_id().clone())
433                                        }
434                                    },
435                                    None => Event::Discovered(registration.record.peer_id().clone()),
436                                };
437
438                              let _ = swarm_tx_clone.send(event);
439                            }
440                        },
441                        _ => {}
442                    }
443                }
444                ctrl_event = control_rx.recv() => {
445                    match ctrl_event {
446                        Some(Event::Stop) => {
447                            debug!("stop received, breaking swarm loop");
448
449                            // Cancel sub-tasks
450                            cancellation_token.cancel();
451
452                            // Disconnect from all peers.
453                            let connected_peers: Vec<PeerId> = swarm.connected_peers().map(Clone::clone).collect();
454                            for peer_id in connected_peers {
455                                let _ = swarm.disconnect_peer_id(peer_id);
456                            }
457
458                            if let Some(token) = &config.token {
459                                if !is_client {
460                                    let pid = swarm.local_peer_id();
461                                    let _ = control::unregister(token, Some(&pid.to_string()), None).await;
462                                }
463                            }
464
465                            let _ = swarm_tx_clone.send(Event::Stop);
466                            break;
467                        }
468                        _ => {}
469                    }
470                }
471
472                Some(request) = control_stream_requests_rx.recv() => {
473                    info!("ControlHandle: control stream request");
474                    request.process(&mut stream_control).await;
475                }
476
477                Some((peer_id, addrs)) = exit_dials.recv() => {
478                    log::info!("Gotta dial a peer! {:?} {:?}", peer_id, addrs);
479                    let dial_opts = DialOpts::peer_id(peer_id).addresses(addrs).build();
480                    let _ = swarm.dial(dial_opts);
481                }
482
483                event = control_peer.select_next_some() => {
484                    match event {
485                        ControlPeerEvent::DialNeeded { peer_id, address: mad} => {
486                            debug!("control peer id: {:?}", peer_id);
487
488                            let dial_opts = DialOpts::peer_id(peer_id)
489                                .addresses(vec![
490                                    mad.clone(),
491                                ])
492                                .build();
493
494                            match swarm.dial(dial_opts) {
495                                Ok(_) => {
496                                    info!("dialed control peer: {:?}", mad);
497                                }
498                                Err(err) => {
499                                    error!("failed to dial control peer: {:?}", err);
500                                }
501                            }
502                        }
503                        ControlPeerEvent::Connected(peer_id) => {
504                            control_handle.add_control_peer(peer_id);
505                        },
506                        ControlPeerEvent::Disconnected(peer_id) => {
507                            control_handle.remove_control_peer(peer_id);
508                        },
509                        ControlPeerEvent::Notification(msg) => {
510                            #[derive(Debug, Deserialize)]
511                            struct SnowflakeConnectionRequest {
512                                #[serde(rename = "ID")]
513                                id: String,
514
515                                #[serde(rename = "Offer")]
516                                offer: String,
517                            }
518
519
520                            let command = match msg.get("Command") {
521                                Some(cmd) => {
522                                    cmd.as_i64().unwrap()
523                                }
524                                None => {
525                                    error!("control peer notification missing command");
526                                    continue;
527                                }
528                            };
529
530                            let _ = swarm_tx_clone.send(Event::NotificationReceived(command));
531
532                            if command == 0 {
533                                let payload = msg.get("Msg").unwrap().clone();
534                                let request = serde_json::from_value::<SnowflakeConnectionRequest>(payload).unwrap();
535                                info!("snowflake connection request: {:?}", request);
536
537                                let mad = control_handle.register_offer(&request.id, &request.offer);
538
539                                let r = swarm.listen_on(mad.clone());
540
541                                match r {
542                                    Ok(_) => {
543                                        info!("listening on snowflake connection request: {:?}", mad);
544                                    }
545                                    Err(err) => {
546                                        error!("failed to listen on snowflake connection request: {:?}", err);
547                                    }
548                                }
549                            }
550                        }
551                    }
552                },
553            }
554        }
555    });
556
557    Ok(Handle {
558        control_tx,
559        swarm_rx,
560        swarm_tx,
561        sserver_port,
562        connected_exits: connected_exits2,
563    })
564}
565
566pub async fn stop(tx: mpsc::Sender<Event>) {
567    let _ = tx.send(Event::Stop).await;
568}
569
570pub fn try_peerid_from_multiaddr(ma: &Multiaddr) -> Option<PeerId> {
571    let mut peer_id = None;
572    for part in ma.iter() {
573        if let Protocol::P2p(id) = part {
574            peer_id = Some(id.clone());
575            break;
576        }
577    }
578    peer_id
579}
580
581pub fn is_snowflake(mad: &Multiaddr) -> bool {
582    match mad.iter().next() {
583        Some(Protocol::Dns4(hostname)) => {
584            hostname.ends_with(".snowflake") || hostname == "snowflake"
585        }
586        _ => false,
587    }
588}
589
590pub fn get_snowflake_id(mad: &Multiaddr) -> Option<String> {
591    match mad.iter().next() {
592        Some(Protocol::Dns4(hostname)) => {
593            if hostname.ends_with(".snowflake") {
594                let parts: Vec<&str> = hostname.split('.').collect();
595                return Some(parts[0].to_string());
596            }
597        }
598        _ => {}
599    }
600
601    None
602}
603
604pub fn snowflake_enrich_route_id(route_id: Option<&str>, mad: &Multiaddr) -> Multiaddr {
605    // If not a snowflake, return the original mad
606    if !is_snowflake(mad) {
607        return mad.clone();
608    }
609
610    match route_id {
611        None => return mad.clone(),
612
613        Some(route_id) => {
614            let addr = format!("/dns4/{}.snowflake", route_id);
615
616            addr.parse().unwrap()
617        }
618    }
619}