shadowquic/
lib.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use error::SError;
5use msgs::socks5::SocksAddr;
6use tokio::io::{AsyncRead, AsyncWrite};
7use tokio::net::TcpStream;
8
9use anyhow::Result;
10use async_trait::async_trait;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tracing::error;
13
14pub mod config;
15pub mod direct;
16pub mod error;
17pub mod msgs;
18pub mod quic;
19pub mod shadowquic;
20pub mod socks;
21pub mod squic;
22pub mod sunnyquic;
23pub mod utils;
24
25pub use msgs::SDecode;
26pub use msgs::SEncode;
27pub enum ProxyRequest<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend> {
28    Tcp(TcpSession<T>),
29    Udp(UdpSession<I, O>),
30}
31/// Udp socket only use immutable reference to self
32/// So it can be safely wrapped by Arc and cloned to work in duplex way.
33#[async_trait]
34pub trait UdpSend: Send + Sync + Unpin {
35    async fn send_to(&self, buf: Bytes, addr: SocksAddr) -> Result<usize, SError>; // addr is proxy addr
36}
37#[async_trait]
38pub trait UdpRecv: Send + Sync + Unpin {
39    async fn recv_from(&mut self) -> Result<(Bytes, SocksAddr), SError>; // socksaddr is proxy addr
40}
41pub struct TcpSession<IO = AnyTcp> {
42    stream: IO,
43    dst: SocksAddr,
44}
45
46pub struct UdpSession<I = AnyUdpRecv, O = AnyUdpSend> {
47    recv: I,
48    send: O,
49    /// Control stream, should be kept alive during session.
50    stream: Option<AnyTcp>,
51    bind_addr: SocksAddr,
52}
53
54pub type AnyTcp = Box<dyn TcpTrait>;
55pub type AnyUdpSend = Arc<dyn UdpSend>;
56pub type AnyUdpRecv = Box<dyn UdpRecv>;
57pub trait TcpTrait: AsyncRead + AsyncWrite + Unpin + Send + Sync {}
58impl TcpTrait for TcpStream {}
59
60#[async_trait]
61pub trait Inbound<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend>: Send + Sync + Unpin {
62    async fn accept(&mut self) -> Result<ProxyRequest<T, I, O>, SError>;
63    async fn init(&self) -> Result<(), SError> {
64        Ok(())
65    }
66}
67
68#[async_trait]
69pub trait Outbound<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend>: Send + Sync + Unpin {
70    async fn handle(&mut self, req: ProxyRequest<T, I, O>) -> Result<(), SError>;
71}
72
73#[async_trait]
74impl UdpSend for Sender<(Bytes, SocksAddr)> {
75    async fn send_to(&self, buf: Bytes, addr: SocksAddr) -> Result<usize, SError> {
76        let siz = buf.len();
77        self.send((buf, addr))
78            .await
79            .map_err(|_| SError::InboundUnavailable)?;
80        Ok(siz)
81    }
82}
83#[async_trait]
84impl UdpRecv for Receiver<(Bytes, SocksAddr)> {
85    async fn recv_from(&mut self) -> Result<(Bytes, SocksAddr), SError> {
86        let r = self.recv().await.ok_or(SError::OutboundUnavailable)?;
87        Ok(r)
88    }
89}
90pub struct Manager {
91    pub inbound: Box<dyn Inbound>,
92    pub outbound: Box<dyn Outbound>,
93}
94
95impl Manager {
96    pub async fn run(self) -> Result<(), SError> {
97        self.inbound.init().await?;
98        let mut inbound = self.inbound;
99        let mut outbound = self.outbound;
100        loop {
101            match inbound.accept().await {
102                Ok(req) => match outbound.handle(req).await {
103                    Ok(_) => {}
104                    Err(e) => {
105                        error!("error during handling request: {}", e)
106                    }
107                },
108                Err(e) => {
109                    error!("error during accepting request: {}", e)
110                }
111            }
112        }
113        #[allow(unreachable_code)]
114        Ok(())
115    }
116}