1pub mod behaviour;
2mod discovery;
3
4pub mod connected_exits;
5mod connections;
6mod control_peer;
7pub mod endpoint;
8pub mod event;
9pub mod metrics;
10pub mod netif;
11pub mod protocols;
12mod roles;
13pub mod swarm;
14
15use connected_exits::ConnectedExits;
16use connections::ConnectionsMap;
17use futures::StreamExt;
18use libp2p::metrics::Recorder;
19pub use libp2p::metrics::Registry;
20use libp2p::swarm::dial_opts::DialOpts;
21use std::net::Ipv4Addr;
22use std::sync::Mutex;
23use tokio_util::sync::CancellationToken;
24use libp2p::multiaddr::Protocol;
26use serde::Deserialize;
27use std::fmt;
28use std::{net::TcpListener, sync::Arc};
29use tokio::sync::{broadcast, mpsc};
30
31use log::*;
32
33pub use libp2p::swarm::ConnectionError;
34use libp2p::swarm::SwarmEvent;
35pub use libp2p::PeerId;
36
37use self::{event::Event, swarm::build_swarm};
38use crate::net::control_peer::ControlPeerEvent;
39use crate::net::discovery::role_to_namespace;
40use crate::{
41 config::{Config, Multiaddr},
42 control,
43 identity::Role,
44 net::{
45 behaviour::SnowstormBehaviourEvent,
46 discovery::{register, search_role},
47 },
48};
49
50pub type Multihash = multihash::Multihash<64>;
51pub const MULTIHASH_IDENTITY_CODE: u64 = 0;
52
53const DEFAULT_TUN_IPV4: Ipv4Addr = Ipv4Addr::new(192, 18, 0, 1);
54
55pub fn get_tun_interface() -> Option<netdev::Interface> {
56 netdev::get_interfaces()
57 .iter()
58 .find(|iface| iface.ipv4.iter().any(|net| net.addr() == DEFAULT_TUN_IPV4))
59 .cloned()
60}
61
62pub fn get_default_interface() -> Option<netdev::Interface> {
63 let mut interfaces = netdev::get_interfaces()
64 .into_iter()
65 .filter(|iface| {
66 let is_tun = iface.name.contains("tun");
68 let is_snowstorm = iface.ipv4.iter().any(|net| net.addr() == DEFAULT_TUN_IPV4);
69 let is_up = iface.is_up();
70
71 #[cfg(target_os = "ios")]
74 if iface.if_type == netdev::interface::InterfaceType::Ethernet {
75 if !(iface.name == "en0" || iface.name == "en1") {
76 return false;
77 }
78 }
79
80 let has_ipv4 = iface.ipv4.iter().any(|net| !net.addr().is_loopback());
81 let has_ipv6 = iface
82 .ipv6
83 .iter()
84 .any(|net| !net.addr().is_unicast_link_local());
85
86 return is_up && !iface.is_loopback() && !is_snowstorm && !is_tun && (has_ipv4 || has_ipv6);
87 })
88 .collect::<Vec<_>>();
89
90 interfaces.sort_by_key(|iface| {
91 #[cfg(not(target_os = "windows"))]
93 {
94 if iface.default {
95 return 0;
96 }
97 }
98
99 let names = [
101 iface.name.to_lowercase(),
102 iface.friendly_name.as_ref().map(|s| s.to_lowercase()).unwrap_or_default(),
103 ];
104
105 let is_en = iface.if_type == netdev::interface::InterfaceType::Ethernet;
106 let is_wifi = iface.if_type == netdev::interface::InterfaceType::Wireless80211;
107 let has_wifi_name = names.iter().any(|name| name.contains("wlan") || name.contains("wlp") || name.contains("wi-fi"));
108 if is_en || is_wifi || has_wifi_name {
109 return 1;
110 }
111
112 return 2;
113 });
114
115 let names = interfaces.iter().map(|iface| iface.name.clone()).collect::<Vec<_>>();
116 let friendly_names = interfaces
117 .iter()
118 .filter_map(|iface| iface.friendly_name.as_ref())
119 .map(|s| s.to_string())
120 .collect::<Vec<_>>();
121 log::info!("Available interfaces: {:?}", names);
122 log::info!("Available friendly interfaces: {:?}", friendly_names);
123
124 return interfaces.first().cloned();
125}
126
127pub fn get_outbound_ipv4() -> Option<Ipv4Addr> {
128 let default_interface = get_default_interface();
129
130 default_interface
131 .and_then(|iface| iface.ipv4.first().cloned())
132 .map(|net| net.addr())
133}
134
135pub fn get_outbound_ips() -> Vec<Multiaddr> {
136 let default_interface = get_default_interface();
137
138 info!("default interface: {:?}", default_interface);
139
140 match default_interface {
141 Some(iface) => {
142 let mut addrs = Vec::<Multiaddr>::new();
143 if let Some(ipv4) = iface.ipv4.first() {
144 addrs.push(Multiaddr::from(ipv4.addr()));
145 }
146 if let Some(ipv6) = iface
147 .ipv6
148 .into_iter()
149 .find(|net| !net.addr().is_unicast_link_local())
150 {
151 addrs.push(Multiaddr::from(ipv6.addr()));
152 }
153 addrs
154 }
155 None => Vec::new(),
156 }
157}
158
159pub fn get_available_local_port() -> Option<u16> {
160 (1024..65535).find(|port| port_is_available(*port))
161}
162
163fn port_is_available(port: u16) -> bool {
164 match TcpListener::bind(("127.0.0.1", port)) {
165 Ok(_) => true,
166 Err(_) => false,
167 }
168}
169
170#[derive(Clone)]
171pub struct ProtectSocketFn(pub Arc<dyn Fn(i32) -> () + Send + Sync>);
172
173impl fmt::Debug for ProtectSocketFn {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 f.write_str("protect_socket_fn")
176 }
177}
178
179impl ProtectSocketFn {
180 pub fn call(&self, fd: i32) {
181 (self.0)(fd);
182 }
183}
184
185pub struct Handle {
186 pub control_tx: mpsc::Sender<Event>,
187 pub swarm_rx: broadcast::Receiver<Event>,
188 pub swarm_tx: broadcast::Sender<Event>,
189 pub sserver_port: Option<u16>,
190 pub connected_exits: Arc<ConnectedExits>,
191}
192
193impl Handle {
194 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
195 self.swarm_tx.subscribe()
196 }
197}
198
199pub async fn start(
201 config: &Config,
202 registry: Arc<Mutex<Registry>>,
203) -> Result<Handle, Box<dyn std::error::Error>> {
204 let (swarm_tx, swarm_rx) = broadcast::channel::<Event>(8);
205 let (control_tx, mut control_rx) = mpsc::channel::<Event>(8);
206 let cancellation_token = CancellationToken::new();
207 let connections_map = Arc::new(ConnectionsMap::new());
208 let is_client = config.roles.contains(&Role::Client);
209 let is_exit = config.roles.contains(&Role::Exit);
210
211 let _ = tracing_subscriber::fmt()
212 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
213 .try_init();
214
215 let mut max_streams: usize = 512;
216
217 if config.fdlimit {
220 match fdlimit::raise_fd_limit() {
221 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
222 info!("raised fd limit from {:?} to {:?}", from, to);
223 println!("raised fd limit from {:?} to {:?}", from, to);
230 max_streams = to as usize;
232 }
233 _ => warn!("failed to raise fd limit"),
234 }
235 }
236
237 let (mut swarm, snowstorm_metrics, swarm_metrics, control_handle) = {
238 let mut metrics_registry = registry.lock().unwrap();
239
240 let metrics = metrics::Metrics::new(&mut metrics_registry);
242
243 let swarm_metrics = libp2p::metrics::Metrics::new(&mut metrics_registry);
245
246 let (s, control_handle) = build_swarm(
248 cancellation_token.clone(),
249 config.clone(),
250 &mut metrics_registry,
251 Some(max_streams),
252 );
253
254 (s, metrics, swarm_metrics, control_handle)
255 };
256
257 let mut control_stream_requests_rx = control_handle.next_stream_request().unwrap();
259
260 let snowstorm_metrics = Arc::new(snowstorm_metrics);
261
262 for addr in config.listen_on.iter() {
264 warn!("attempting to listen on {:?}", addr);
265 let result = swarm.listen_on(addr.clone());
266 if let Err(err) = result {
267 warn!("Failed to listen on multiaddr {:?}: {:?}", addr, err);
268 }
269 }
270
271 for addr in config.external_addrs.iter() {
272 swarm.add_external_address(addr.clone());
273 }
274
275 for addr in config.entry_peers.iter() {
276 let dial_result = swarm.dial(addr.clone());
277
278 match dial_result {
279 Ok(_) => info!("connected to entry peer: {:?}", addr),
280 Err(err) => warn!("failed to dial entry peer: {:?}", err),
281 }
282 }
283
284 let mut cookie: Option<libp2p::rendezvous::Cookie> = None;
285
286 if is_exit {
287 roles::exit::serve(
289 &mut swarm,
290 &config,
291 snowstorm_metrics.clone(),
292 cancellation_token.clone(),
293 connections_map.clone(),
294 );
295 }
296
297 let mut sserver_port = None;
298 let (connected_exits, mut exit_dials) = ConnectedExits::new(config.token.as_deref().unwrap_or("").to_string(), cancellation_token.clone());
299 let connected_exits = Arc::new(connected_exits);
300 let connected_exits2 = connected_exits.clone();
301
302 if is_client {
305 match roles::client::serve(
308 &mut swarm,
309 &config,
310 snowstorm_metrics.clone(),
311 cancellation_token.clone(),
312 connections_map.clone(),
313 connected_exits.clone(),
314 )
315 .await
316 {
317 Ok(port) => {
318 info!("[SDK] client role started on port: {}", port);
319 sserver_port = Some(port);
320 }
321 Err(e) => {
322 info!(
323 "[SDK] Error while starting client role - but continuing: {:?}",
324 e
325 );
326 }
329 }
330 }
331
332 let config = config.clone();
333 let swarm_tx_clone = swarm_tx.clone();
334 let mut stream_control = swarm.behaviour_mut().stream().unwrap().new_control();
335 let mut control_peer = control_peer::ControlPeer::new(stream_control.clone());
336
337 tokio::spawn(async move {
338 let config = config.clone();
339
340 loop {
341 tokio::select! {
342 event = swarm.select_next_some() => {
343 swarm_metrics.record(&event);
344 control_peer.on_swarm_event(&event);
345
346 if is_client {
348 connected_exits.on_swarm_event(&event);
349 }
350
351 match event {
352 SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
353 debug!("connection established: {:?}", peer_id);
354 let _ = swarm_tx_clone.send(Event::ConnectionEstablished(peer_id.clone()));
355 connections_map.connection_established(peer_id.clone(), &endpoint);
356
357 if endpoint.is_listener() {
358 let _ = swarm_tx_clone.send(Event::ListenerConnectionEstablished(peer_id.clone()));
359 } else if endpoint.is_dialer() {
360 let mad = endpoint.get_remote_address();
361 log::info!("dialer connection established: {:?}", mad);
362 let _ = swarm_tx_clone.send(Event::DialerConnectionEstablished(peer_id.clone(), is_snowflake(mad)));
363 }
364
365 if config.has_entry(&peer_id) {
366 debug!("peer is in entry peer list, sending rendezvous request");
367 register(&config, swarm.behaviour_mut(), peer_id.clone());
368
369 _ = endpoint;
370 }
376
377 search_role(&config, swarm.behaviour_mut(), Role::Volunteer, peer_id, cookie.clone());
378 search_role(&config, swarm.behaviour_mut(), Role::Exit, peer_id, cookie.clone());
379 },
380 SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
381 debug!("connection closed: {:?}", peer_id);
382 connections_map.connection_closed(&peer_id);
383 let _ = swarm_tx_clone.send(Event::ConnectionClosed(peer_id, cause.map(Into::into)));
384 },
385 SwarmEvent::NewListenAddr { address, .. } => {
386 debug!("new listen addr: {:?}", address);
387
388 if let Err(err) = swarm_tx_clone.send(Event::NewListenAddr(address)) {
389 error!("Failed to send event: {:?}", err);
390 }
391
392 if let Some(token) = &config.token {
393 if !is_client {
394 let device_id = config.device_id.clone();
395 let pid = swarm.local_peer_id();
396 let mas: Vec<&libp2p::Multiaddr> = swarm.external_addresses().map(|addr| addr).collect();
397 let _ = control::update_multiaddrs(token.to_string(), pid.clone().to_string(), device_id, mas, true).await;
398 }
399 }
400 },
401 SwarmEvent::Behaviour(SnowstormBehaviourEvent::Identify(libp2p::identify::Event::Received { info, .. })) => {
402 debug!("identify observed addr {:?}", info.observed_addr);
403
404 if let Some(token) = &config.token {
405 if !is_client {
408 let mut mas: Vec<Multiaddr> = swarm.listeners().map(Clone::clone).collect();
409 mas.push(info.observed_addr.clone());
410 let pid = swarm.local_peer_id();
411 let device_id = config.device_id.clone();
412 let _ = control::update_multiaddrs(token.to_string(), pid.clone().to_string(), device_id, mas.iter().collect(), false).await;
413 }
414 }
415 let event = Event::NewObservedAddr(info.observed_addr);
416 let _ = swarm_tx_clone.send(event);
417 },
418 SwarmEvent::Behaviour(SnowstormBehaviourEvent::RendezvousClient(libp2p::rendezvous::client::Event::Discovered {
419 registrations,
420 cookie: new_cookie,
421 ..
422 })) => {
423 cookie.replace(new_cookie);
424 debug!("rendezvous discovered: {:?}/{:?}", cookie.clone().unwrap().namespace(), registrations);
425 for registration in registrations {
426 info!("discovered rendezvous peer: {:?}", registration.record.peer_id());
427 let event = match cookie.clone().unwrap().namespace() {
428 Some(namespace) => {
429 if namespace.to_string() == role_to_namespace(&config, Role::Exit).to_string() {
430 Event::ExitDiscovered(registration.record.peer_id().clone())
431 } else {
432 Event::Discovered(registration.record.peer_id().clone())
433 }
434 },
435 None => Event::Discovered(registration.record.peer_id().clone()),
436 };
437
438 let _ = swarm_tx_clone.send(event);
439 }
440 },
441 _ => {}
442 }
443 }
444 ctrl_event = control_rx.recv() => {
445 match ctrl_event {
446 Some(Event::Stop) => {
447 debug!("stop received, breaking swarm loop");
448
449 cancellation_token.cancel();
451
452 let connected_peers: Vec<PeerId> = swarm.connected_peers().map(Clone::clone).collect();
454 for peer_id in connected_peers {
455 let _ = swarm.disconnect_peer_id(peer_id);
456 }
457
458 if let Some(token) = &config.token {
459 if !is_client {
460 let pid = swarm.local_peer_id();
461 let _ = control::unregister(token, Some(&pid.to_string()), None).await;
462 }
463 }
464
465 let _ = swarm_tx_clone.send(Event::Stop);
466 break;
467 }
468 _ => {}
469 }
470 }
471
472 Some(request) = control_stream_requests_rx.recv() => {
473 info!("ControlHandle: control stream request");
474 request.process(&mut stream_control).await;
475 }
476
477 Some((peer_id, addrs)) = exit_dials.recv() => {
478 log::info!("Gotta dial a peer! {:?} {:?}", peer_id, addrs);
479 let dial_opts = DialOpts::peer_id(peer_id).addresses(addrs).build();
480 let _ = swarm.dial(dial_opts);
481 }
482
483 event = control_peer.select_next_some() => {
484 match event {
485 ControlPeerEvent::DialNeeded { peer_id, address: mad} => {
486 debug!("control peer id: {:?}", peer_id);
487
488 let dial_opts = DialOpts::peer_id(peer_id)
489 .addresses(vec![
490 mad.clone(),
491 ])
492 .build();
493
494 match swarm.dial(dial_opts) {
495 Ok(_) => {
496 info!("dialed control peer: {:?}", mad);
497 }
498 Err(err) => {
499 error!("failed to dial control peer: {:?}", err);
500 }
501 }
502 }
503 ControlPeerEvent::Connected(peer_id) => {
504 control_handle.add_control_peer(peer_id);
505 },
506 ControlPeerEvent::Disconnected(peer_id) => {
507 control_handle.remove_control_peer(peer_id);
508 },
509 ControlPeerEvent::Notification(msg) => {
510 #[derive(Debug, Deserialize)]
511 struct SnowflakeConnectionRequest {
512 #[serde(rename = "ID")]
513 id: String,
514
515 #[serde(rename = "Offer")]
516 offer: String,
517 }
518
519
520 let command = match msg.get("Command") {
521 Some(cmd) => {
522 cmd.as_i64().unwrap()
523 }
524 None => {
525 error!("control peer notification missing command");
526 continue;
527 }
528 };
529
530 let _ = swarm_tx_clone.send(Event::NotificationReceived(command));
531
532 if command == 0 {
533 let payload = msg.get("Msg").unwrap().clone();
534 let request = serde_json::from_value::<SnowflakeConnectionRequest>(payload).unwrap();
535 info!("snowflake connection request: {:?}", request);
536
537 let mad = control_handle.register_offer(&request.id, &request.offer);
538
539 let r = swarm.listen_on(mad.clone());
540
541 match r {
542 Ok(_) => {
543 info!("listening on snowflake connection request: {:?}", mad);
544 }
545 Err(err) => {
546 error!("failed to listen on snowflake connection request: {:?}", err);
547 }
548 }
549 }
550 }
551 }
552 },
553 }
554 }
555 });
556
557 Ok(Handle {
558 control_tx,
559 swarm_rx,
560 swarm_tx,
561 sserver_port,
562 connected_exits: connected_exits2,
563 })
564}
565
566pub async fn stop(tx: mpsc::Sender<Event>) {
567 let _ = tx.send(Event::Stop).await;
568}
569
570pub fn try_peerid_from_multiaddr(ma: &Multiaddr) -> Option<PeerId> {
571 let mut peer_id = None;
572 for part in ma.iter() {
573 if let Protocol::P2p(id) = part {
574 peer_id = Some(id.clone());
575 break;
576 }
577 }
578 peer_id
579}
580
581pub fn is_snowflake(mad: &Multiaddr) -> bool {
582 match mad.iter().next() {
583 Some(Protocol::Dns4(hostname)) => {
584 hostname.ends_with(".snowflake") || hostname == "snowflake"
585 }
586 _ => false,
587 }
588}
589
590pub fn get_snowflake_id(mad: &Multiaddr) -> Option<String> {
591 match mad.iter().next() {
592 Some(Protocol::Dns4(hostname)) => {
593 if hostname.ends_with(".snowflake") {
594 let parts: Vec<&str> = hostname.split('.').collect();
595 return Some(parts[0].to_string());
596 }
597 }
598 _ => {}
599 }
600
601 None
602}
603
604pub fn snowflake_enrich_route_id(route_id: Option<&str>, mad: &Multiaddr) -> Multiaddr {
605 if !is_snowflake(mad) {
607 return mad.clone();
608 }
609
610 match route_id {
611 None => return mad.clone(),
612
613 Some(route_id) => {
614 let addr = format!("/dns4/{}.snowflake", route_id);
615
616 addr.parse().unwrap()
617 }
618 }
619}