1use std::sync::Arc;
2
3use bytes::Bytes;
4use error::SError;
5use msgs::socks5::SocksAddr;
6use tokio::io::{AsyncRead, AsyncWrite};
7use tokio::net::TcpStream;
8
9use anyhow::Result;
10use async_trait::async_trait;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tracing::error;
13
14pub mod config;
15pub mod direct;
16pub mod error;
17pub mod msgs;
18pub mod shadowquic;
19pub mod socks;
20pub mod utils;
21pub enum ProxyRequest<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend> {
22 Tcp(TcpSession<T>),
23 Udp(UdpSession<I, O>),
24}
25#[async_trait]
28pub trait UdpSend: Send + Sync + Unpin {
29 async fn send_to(&self, buf: Bytes, addr: SocksAddr) -> Result<usize, SError>; }
31#[async_trait]
32pub trait UdpRecv: Send + Sync + Unpin {
33 async fn recv_from(&mut self) -> Result<(Bytes, SocksAddr), SError>; }
35pub struct TcpSession<IO = AnyTcp> {
36 stream: IO,
37 dst: SocksAddr,
38}
39
40pub struct UdpSession<I = AnyUdpRecv, O = AnyUdpSend> {
41 recv: I,
42 send: O,
43 stream: Option<AnyTcp>,
45 dst: SocksAddr,
46}
47
48pub type AnyTcp = Box<dyn TcpTrait>;
49pub type AnyUdpSend = Arc<dyn UdpSend>;
50pub type AnyUdpRecv = Box<dyn UdpRecv>;
51pub trait TcpTrait: AsyncRead + AsyncWrite + Unpin + Send + Sync {}
52impl TcpTrait for TcpStream {}
53
54#[async_trait]
55pub trait Inbound<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend>: Send + Sync + Unpin {
56 async fn accept(&mut self) -> Result<ProxyRequest<T, I, O>, SError>;
57 async fn init(&self) -> Result<(), SError> {
58 Ok(())
59 }
60}
61
62#[async_trait]
63pub trait Outbound<T = AnyTcp, I = AnyUdpRecv, O = AnyUdpSend>: Send + Sync + Unpin {
64 async fn handle(&mut self, req: ProxyRequest<T, I, O>) -> Result<(), SError>;
65}
66
67#[async_trait]
68impl UdpSend for Sender<(Bytes, SocksAddr)> {
69 async fn send_to(&self, buf: Bytes, addr: SocksAddr) -> Result<usize, SError> {
70 let siz = buf.len();
71 self.send((buf, addr))
72 .await
73 .map_err(|_| SError::InboundUnavailable)?;
74 Ok(siz)
75 }
76}
77#[async_trait]
78impl UdpRecv for Receiver<(Bytes, SocksAddr)> {
79 async fn recv_from(&mut self) -> Result<(Bytes, SocksAddr), SError> {
80 let r = self.recv().await.ok_or(SError::OutboundUnavailable)?;
81 Ok(r)
82 }
83}
84pub struct Manager {
85 pub inbound: Box<dyn Inbound>,
86 pub outbound: Box<dyn Outbound>,
87}
88
89impl Manager {
90 pub async fn run(self) -> Result<(), SError> {
91 self.inbound.init().await?;
92 let mut inbound = self.inbound;
93 let mut outbound = self.outbound;
94 loop {
95 match inbound.accept().await {
96 Ok(req) => match outbound.handle(req).await {
97 Ok(_) => {}
98 Err(e) => {
99 error!("error during handling request: {}", e)
100 }
101 },
102 Err(e) => {
103 error!("error during accepting request: {}", e)
104 }
105 }
106 }
107 #[allow(unreachable_code)]
108 Ok(())
109 }
110}