shadowquic/
lib.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use error::SError;
5use msgs::socks5::SocksAddr;
6use tokio::io::{AsyncRead, AsyncWrite};
7use tokio::net::TcpStream;
8
9use anyhow::Result;
10use async_trait::async_trait;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tracing::error;
13
14pub mod config;
15pub mod direct;
16pub mod error;
17pub mod msgs;
18pub mod shadowquic;
19pub mod socks;
20pub mod utils;
21pub enum ProxyRequest<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend> {
22    Tcp(TcpSession<T>),
23    Udp(UdpSession<I, O>),
24}
25/// Udp socket only use immutable reference to self
26/// So it can be safely wrapped by Arc and cloned to work in duplex way.
27#[async_trait]
28pub trait UdpSend: Send + Sync + Unpin {
29    async fn send_to(&self, buf: Bytes, addr: SocksAddr) -> Result<usize, SError>; // addr is proxy addr
30}
31#[async_trait]
32pub trait UdpRecv: Send + Sync + Unpin {
33    async fn recv_from(&mut self) -> Result<(Bytes, SocksAddr), SError>; // socksaddr is proxy addr
34}
35pub struct TcpSession<IO = AnyTcp> {
36    stream: IO,
37    dst: SocksAddr,
38}
39
40pub struct UdpSession<I = AnyUdpRecv, O = AnyUdpSend> {
41    recv: I,
42    send: O,
43    /// Control stream, should be kept alive during session.
44    stream: Option<AnyTcp>,
45    dst: SocksAddr,
46}
47
48pub type AnyTcp = Box<dyn TcpTrait>;
49pub type AnyUdpSend = Arc<dyn UdpSend>;
50pub type AnyUdpRecv = Box<dyn UdpRecv>;
51pub trait TcpTrait: AsyncRead + AsyncWrite + Unpin + Send + Sync {}
52impl TcpTrait for TcpStream {}
53
54#[async_trait]
55pub trait Inbound<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend>: Send + Sync + Unpin {
56    async fn accept(&mut self) -> Result<ProxyRequest<T, I, O>, SError>;
57    async fn init(&self) -> Result<(), SError> {
58        Ok(())
59    }
60}
61
62#[async_trait]
63pub trait Outbound<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend>: Send + Sync + Unpin {
64    async fn handle(&mut self, req: ProxyRequest<T, I, O>) -> Result<(), SError>;
65}
66
67#[async_trait]
68impl UdpSend for Sender<(Bytes, SocksAddr)> {
69    async fn send_to(&self, buf: Bytes, addr: SocksAddr) -> Result<usize, SError> {
70        let siz = buf.len();
71        self.send((buf, addr))
72            .await
73            .map_err(|_| SError::InboundUnavailable)?;
74        Ok(siz)
75    }
76}
77#[async_trait]
78impl UdpRecv for Receiver<(Bytes, SocksAddr)> {
79    async fn recv_from(&mut self) -> Result<(Bytes, SocksAddr), SError> {
80        let r = self.recv().await.ok_or(SError::OutboundUnavailable)?;
81        Ok(r)
82    }
83}
84pub struct Manager {
85    pub inbound: Box<dyn Inbound>,
86    pub outbound: Box<dyn Outbound>,
87}
88
89impl Manager {
90    pub async fn run(self) -> Result<(), SError> {
91        self.inbound.init().await?;
92        let mut inbound = self.inbound;
93        let mut outbound = self.outbound;
94        loop {
95            match inbound.accept().await {
96                Ok(req) => match outbound.handle(req).await {
97                    Ok(_) => {}
98                    Err(e) => {
99                        error!("error during handling request: {}", e)
100                    }
101                },
102                Err(e) => {
103                    error!("error during accepting request: {}", e)
104                }
105            }
106        }
107        #[allow(unreachable_code)]
108        Ok(())
109    }
110}