snowstorm/net/endpoint/
mod.rs

1/**
2 * Snowstorm SDK - Net Endpoint
3 *
4 * © Snowstorm Inc 2025+
5 */
6use etherparse::{IcmpEchoHeader, Icmpv4Header, IpNumber};
7use ipnet::{Ipv4Net, Ipv6Net};
8#[allow(unused_imports)]
9use ipstack::{
10    stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport},
11    IpStackConfig,
12};
13use log::{debug, warn};
14use shadowsocks::{
15    config::ServerType, context::Context, crypto::CipherKind, net::TcpStream, ProxyClientStream,
16    ServerConfig,
17};
18use std::{
19    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
20    time::Duration,
21};
22
23use tokio::sync::watch;
24use tokio::{
25    io::{AsyncRead, AsyncWrite, AsyncWriteExt},
26    select,
27};
28use tokio_util::{compat::FuturesAsyncReadCompatExt, sync::CancellationToken};
29
30#[cfg(not(target_os = "windows"))]
31use std::os::unix::io::RawFd;
32
33#[cfg(not(target_os = "windows"))]
34use tunio::{
35    traits::{DriverT, InterfaceT},
36    DefaultAsyncInterface, DefaultDriver,
37};
38
39#[cfg(feature = "snowstorm_client_udp")]
40use {
41    shadowsocks::{config::Mode, relay::Address, ProxySocket},
42    tokio::io::AsyncReadExt,
43};
44
45const MTU: u16 = 1500;
46
47#[cfg(not(target_os = "windows"))]
48pub async fn new_client(
49    raw_fd: RawFd,
50    token: CancellationToken,
51    sserver_port: watch::Receiver<u16>,
52) -> Result<(), Box<dyn std::error::Error>> {
53    let mut tun_driver = DefaultDriver::new()?;
54    let tun_config = DefaultAsyncInterface::config_builder()
55        .name("utun321".to_string()) // OSX naming convention.
56        .platform(|mut b| b.raw_fd(Some(raw_fd)).build())
57        .unwrap()
58        .build()?;
59
60    // Create the tun device.
61    debug!("tun");
62    let tun_dev = DefaultAsyncInterface::new_up(&mut tun_driver, tun_config)?;
63
64    return new_client_with_device(tun_dev.compat(), token, sserver_port).await;
65}
66
67pub async fn new_client_with_device<D>(
68    device: D,
69    token: CancellationToken,
70    sserver_port: watch::Receiver<u16>,
71) -> Result<(), Box<dyn std::error::Error>>
72where
73    D: AsyncRead + AsyncWrite + Unpin + Send + 'static,
74{
75    let ipstack_config = IpStackConfig {
76        mtu: MTU,
77        packet_information: cfg!(any(target_os = "macos", target_os = "ios")),
78        tcp_timeout: Duration::from_secs(60),
79        udp_timeout: Duration::from_secs(30),
80    };
81
82    // Start pushing packets back and forth.
83    debug!("ipstack");
84    let ip_stack = ipstack::IpStack::new(ipstack_config, device);
85
86    // The sidecar curently hardcodes the addreses, we should
87    // just use `tun_dev.handle()` to query the address instead.
88    let if_net4: Ipv4Net = "192.18.0.1/24".parse().unwrap();
89    let if_addr4 = if_net4.addr();
90    let if_net6: Ipv6Net = "2001::1/64".parse().unwrap();
91    let if_addr6 = if_net6.addr();
92
93    debug!("task");
94    tokio::spawn(async move {
95        let _ = ipstack_worker(ip_stack, if_addr4, if_addr6, token, sserver_port).await;
96    });
97
98    Ok(())
99}
100
101async fn ipstack_worker(
102    mut ip_stack: ipstack::IpStack,
103    if_addr4: Ipv4Addr,
104    if_addr6: Ipv6Addr,
105    token: CancellationToken,
106    sserver_port: watch::Receiver<u16>,
107) -> Result<(), Box<dyn std::error::Error>> {
108    loop {
109        let stream = select! {
110            _ = token.cancelled() => {
111                debug!("endpoint: shutdown requested");
112                break;
113            }
114            result = ip_stack.accept() => {
115                match result {
116                    Ok(stream) => {
117                        stream
118                    }
119                    Err(e) => {
120                        warn!("endpoint: ipstack accept failed: {}", e);
121                        break;
122                    }
123                }
124            }
125        };
126
127        let ss_ctx = Context::new_shared(ServerType::Local);
128        let sserver_port = sserver_port.borrow().clone();
129        let ss_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), sserver_port);
130        #[allow(unused_mut)]
131        let mut ss_cfg = ServerConfig::new(ss_addr, "", CipherKind::NONE)?;
132        #[cfg(feature = "snowstorm_client_udp")]
133        ss_cfg.set_mode(Mode::TcpAndUdp);
134
135        match stream {
136            IpStackStream::Tcp(tcp) => {
137                let dst_addr = tcp.peer_addr();
138
139                debug!("TCP: New connection to {:?}", dst_addr);
140
141                let s = match ProxyClientStream::connect(ss_ctx.clone(), &ss_cfg, dst_addr).await {
142                    Ok(s) => s,
143                    Err(e) => {
144                        warn!("TCP: failed to connect to {:?}: {:?}", dst_addr, e);
145                        continue;
146                    }
147                };
148
149                tokio::spawn(async move {
150                    match tcp_stream_worker(tcp, s, dst_addr).await {
151                        Ok(_) => {}
152                        Err(e) => {
153                            warn!("TCP: stream failed {:?}: {:?}", dst_addr, e);
154                        }
155                    };
156                });
157            }
158            IpStackStream::Udp(udp) => {
159                let dst_addr = udp.peer_addr();
160
161                #[cfg(not(feature = "snowstorm_client_udp"))]
162                #[cfg(not(target_os = "android"))]
163                debug!("UDP: Ignoring packet to {:?}", dst_addr);
164
165                // Android is not great at falling back from regular UDP-powered DNS, same goes for Windows
166                // So for the time being we relay DoH to google's DNS servers.
167                #[cfg(not(feature = "snowstorm_client_udp"))]
168                #[cfg(any(target_os = "android", target_os = "windows"))]
169                if dst_addr.port() == 53 {
170                    debug!("Android DNS relay via DoH");
171                    tokio::spawn(async move {
172                        // Ignoring the error here, since most of the "streams" will inevitably timeout.
173                        // We usually get one read per "stream".
174                        let _ = udp_doh_relay_worker(udp).await;
175                    });
176                }
177
178                #[cfg(feature = "snowstorm_client_udp")]
179                {
180                    debug!("UDP: New packet stream to {:?}", dst_addr);
181
182                    let s = match ProxySocket::connect(ss_ctx.clone(), &ss_cfg).await {
183                        Ok(s) => s,
184                        Err(e) => {
185                            warn!("UDP: failed to connect to {:?}: {:?}", dst_addr, e);
186                            continue;
187                        }
188                    };
189
190                    tokio::spawn(async move {
191                        match udp_stream_worker(udp, s, dst_addr).await {
192                            Ok(_) => {}
193                            Err(e) => {
194                                warn!("UDP: stream failed {:?}: {:?}", dst_addr, e);
195                            }
196                        };
197                    });
198                }
199            }
200            IpStackStream::UnknownTransport(u) => {
201                handle_unknown_transport(u, if_addr4, if_addr6)?;
202            }
203            IpStackStream::UnknownNetwork(pkt) => {
204                debug!("Unknown: Not even IP? - {} bytes", pkt.len());
205            }
206        };
207    }
208
209    // Stop the ip_stack worker.
210    debug!("ipstack abort");
211    ip_stack.handle.abort();
212    debug!("ipstack aborted");
213
214    // XXX: We may need to close the fd out from underneath
215    // the tunio crate...
216
217    Ok(())
218}
219
220async fn tcp_stream_worker(
221    local_tcp: IpStackTcpStream,
222    remote_tcp: ProxyClientStream<TcpStream>,
223    dst_addr: SocketAddr,
224) -> Result<(), Box<dyn std::error::Error>> {
225    debug!("TCP: started worker: {:?}", dst_addr);
226
227    let (mut l_rx, mut l_tx) = tokio::io::split(local_tcp);
228    let (mut r_rx, mut r_tx) = tokio::io::split(remote_tcp);
229
230    // Per crates/shadowsocks_service/src/local/utils.rs, there
231    // are server-speaks first protocols like FTP.  The helper
232    // to deal with this is naturally `pub(crate)`.
233    let _ = r_tx.write(&[]).await;
234
235    let _ = tokio::join!(
236        async move {
237            let r = tokio::io::copy(&mut l_rx, &mut r_tx).await;
238            let _ = r_tx.shutdown().await;
239            r
240        },
241        async move {
242            let r = tokio::io::copy(&mut r_rx, &mut l_tx).await;
243            let _ = l_tx.shutdown().await;
244            r
245        }
246    );
247
248    debug!("TCP: Finished connection to {:?}", dst_addr);
249
250    Ok(())
251}
252
253#[cfg(not(feature = "snowstorm_client_udp"))]
254#[cfg(any(target_os = "android", target_os = "windows"))]
255async fn udp_doh_relay_worker(mut udp: IpStackUdpStream) -> Result<(), Box<dyn std::error::Error>> {
256    use reqwest::Client;
257    use tokio::io::{AsyncReadExt, AsyncWriteExt};
258
259    let google_dns_ips = ["8.8.8.8:443", "8.8.4.4:443"].map(|ip| ip.parse().unwrap());
260
261    let client = Client::builder()
262        .resolve_to_addrs("dns.google", &google_dns_ips)
263        .build()?;
264
265    let mut buf = vec![0; 2048];
266    loop {
267        let len = udp.read(&mut buf).await?;
268        if len == 0 {
269            return Ok(());
270        }
271
272        let dns_query = &buf[..len];
273
274        let response = client
275            .post("https://dns.google/dns-query")
276            .header("Content-Type", "application/dns-message")
277            .body(dns_query.to_vec())
278            .send()
279            .await?;
280
281        if response.status().is_success() {
282            let dns_response = response.bytes().await?;
283            udp.write_all(&dns_response).await?;
284        } else {
285            warn!("DoH: failed to relay DNS query: {:?}", response.status());
286        }
287    }
288}
289
290// TODO: In theory there is only a need for a single ProxySocket that is
291// shared across all UDP packet streams, since the ShadowSocks protocol
292// can handle multiplexing.  Doing so would be more efficient, but this
293// is a hell of a lot simpler.
294//
295// This is heavily inspired by https://github.com/tun2proxy/tun2proxy/blob/master/src/lib.rs
296#[cfg(feature = "snowstorm_client_udp")]
297async fn udp_stream_worker(
298    mut local_udp: IpStackUdpStream,
299    remote_udp: ProxySocket,
300    dst_addr: SocketAddr,
301) -> Result<(), Box<dyn std::error::Error>> {
302    const UDP_MSS: usize = 8192; // 64 KiB in theory.
303
304    let mut l2s_buf = [0_u8; UDP_MSS as usize];
305    let mut s2l_buf = [0_u8; UDP_MSS as usize];
306
307    let ss_dst_addr = Address::from(dst_addr);
308
309    debug!("UDP: started worker: {:?}", dst_addr);
310
311    loop {
312        tokio::select! {
313            len = local_udp.read(&mut l2s_buf) => {
314                let len = len?;
315                if len == 0 {
316                    break;
317                }
318                let pkt = &l2s_buf[..len];
319
320                debug!("UDP: local->remote: {:?} {} bytes", dst_addr, len);
321
322                remote_udp.send(&ss_dst_addr, pkt).await?;
323            }
324            res = remote_udp.recv(&mut s2l_buf) => {
325                let len = res?.0;
326                if len == 0 {
327                    break;
328                }
329                let pkt = &s2l_buf[..len];
330
331                debug!("UDP: remote->local: {:?} {} bytes", dst_addr, len);
332
333                local_udp.write_all(pkt).await?;
334            }
335        }
336    }
337
338    debug!("UDP: Finished connection to {:?}", dst_addr);
339
340    Ok(())
341}
342
343fn handle_unknown_transport(
344    u: IpStackUnknownTransport,
345    if_addr4: Ipv4Addr,
346    _if_addr6: Ipv6Addr,
347) -> Result<(), Box<dyn std::error::Error>> {
348    let dst_addr = u.dst_addr();
349
350    if u.ip_protocol() != IpNumber::ICMP {
351        debug!("IP: {:?} {:?}", dst_addr, u.ip_protocol());
352        return Ok(());
353    }
354
355    // TODO: This could do IPv6, but the response header checksum
356    // covers the src and destination addresses, and it's not obvious
357    // to me how to convert from the rust Ipv6Addr type to a `[u8; 16]`.
358    if !u.src_addr().is_ipv4() {
359        debug!("ICMP: {:?} {:?}", dst_addr, u.ip_protocol());
360        return Ok(());
361    }
362
363    let (icmp_header, req_payload) = Icmpv4Header::from_slice(u.payload())?;
364    if let etherparse::Icmpv4Type::EchoRequest(req) = icmp_header.icmp_type {
365        debug!("ICMPv4: echo {:?}", dst_addr);
366
367        if dst_addr != if_addr4 {
368            return Ok(());
369        }
370
371        let echo = IcmpEchoHeader {
372            id: req.id,
373            seq: req.seq,
374        };
375        let mut resp = Icmpv4Header::new(etherparse::Icmpv4Type::EchoReply(echo));
376        resp.update_checksum(req_payload);
377        let mut payload = resp.to_bytes().to_vec();
378        payload.extend_from_slice(req_payload);
379        u.send(payload)?;
380    } else {
381        debug!("ICMPv4: {:?} {:?}", icmp_header.icmp_type, dst_addr);
382    }
383
384    Ok(())
385}