diff --git a/Cargo.lock b/Cargo.lock index f7c7817..b86446e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,6 +34,28 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "byte_string" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11aade7a05aa8c3a351cedc44c3fc45806430543382fcc4743a9b757a2a0b4ed" + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "bytes" version = "1.1.0" @@ -69,22 +91,23 @@ checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" dependencies = [ "atty", "humantime", - "log", + "log 0.4.14", "regex", "termcolor", ] [[package]] name = "fourth" -version = "0.1.1" +version = "0.1.2" dependencies = [ "futures", - "log", + "log 0.4.14", "pretty_env_logger", "serde", "serde_yaml", "tls-parser", "tokio", + "tokio_kcp", ] [[package]] @@ -235,6 +258,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + +[[package]] +name = "kcp" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09481e52ffe09d417d8b770217faca77eeb048ab5f337562cede72070fc91b21" +dependencies = [ + "bytes 0.4.12", + "log 0.3.9", +] + [[package]] name = "libc" version = "0.2.103" @@ -256,6 +298,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" +dependencies = [ + "log 0.4.14", +] + [[package]] name = "log" version = "0.4.14" @@ -284,7 +335,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" dependencies = [ "libc", - "log", + "log 0.4.14", "miow", "ntapi", "winapi", @@ -463,7 +514,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d" dependencies = [ "env_logger", - "log", + "log 0.4.14", ] [[package]] @@ -689,7 +740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" dependencies = [ "autocfg", - "bytes", + "bytes 1.1.0", "libc", "memchr", "mio", @@ -713,6 +764,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio_kcp" +version = "0.8.0" +source = "git+https://github.com/Matrix-Zhang/tokio_kcp?rev=d93a2f2#d93a2f2ad2cba731dce8490f0960246d2a655033" +dependencies = [ + "byte_string", + "bytes 1.1.0", + "futures", + "kcp", + "log 0.4.14", + "tokio", +] + [[package]] name = "unicode-xid" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index 494433c..9659251 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fourth" -version = "0.1.1" +version = "0.1.2" edition = "2021" authors = ["LI Rui "] license = "Apache-2.0" @@ -13,6 +13,8 @@ categories = ["web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +exclude = [".*"] + [dependencies] log = "0.4" pretty_env_logger = "0.4" @@ -21,4 +23,5 @@ serde_yaml = "0.8" futures = "0.3" tls-parser = "0.11" -tokio = { version = "1.0", features = ["full"] } \ No newline at end of file +tokio = { version = "1.0", features = ["full"] } +tokio_kcp = { git = "https://github.com/Matrix-Zhang/tokio_kcp", rev="d93a2f2" } \ No newline at end of file diff --git a/README-EN.md b/README-EN.md index d984d0b..3ea60c4 100644 --- a/README-EN.md +++ b/README-EN.md @@ -4,12 +4,13 @@ [![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml) -Fourth is a layer 4 proxy implemented by Rust to listen on specific ports and transfer data to remote addresses according to configuration. +Fourth is a layer 4 proxy implemented by Rust to listen on specific ports and transfer TCP/KCP data to remote addresses(only TCP) according to configuration. ## Features - Listen on specific port and proxy to local or remote port - SNI-based rule without terminating TLS connection +- Allow TCP inbound ## Installation diff --git a/README.md b/README.md index 7e768b5..bf710f3 100644 --- a/README.md +++ b/README.md @@ -6,12 +6,13 @@ [English](/README-EN.md) -Fourth是一个Rust实现的Layer 4代理,用于监听指定端口TCP流量,并根据规则转发到指定目标。 +Fourth是一个Rust实现的Layer 4代理,用于监听指定端口TCP/KCP流量,并根据规则转发到指定目标(目前只支持TCP)。 ## 功能 - 监听指定端口代理到本地或远端指定端口 - 监听指定端口,通过TLS ClientHello消息中的SNI进行分流 +- 支持KCP入站 ## 安装方法 @@ -37,20 +38,25 @@ servers: listen: - "0.0.0.0:443" - "[::]:443" - tls: true # 启动SNI分流,将根据TLS请求中的主机名分流 + tls: true # Enable TLS features like SNI filtering sni: proxy.example.com: proxy www.example.com: nginx default: ban - relay_server: + proxy_server: listen: - "127.0.0.1:8081" default: remote + kcp_server: + protocol: kcp # default TCP + listen: + - "127.0.0.1:8082" + default: echo upstream: nginx: "127.0.0.1:8080" proxy: "127.0.0.1:1024" - other: "www.remote.example.com:8082" # 代理到远端地址 + other: "www.remote.example.com:8082" # proxy to remote address ``` 内置两个的upstream:ban(立即中断连接)、echo(返回读到的数据)。 diff --git a/example-config.yaml b/example-config.yaml index e6ce61d..7a62da1 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -15,6 +15,11 @@ servers: listen: - "127.0.0.1:8081" default: remote + kcp_server: + protocol: kcp # default TCP + listen: + - "127.0.0.1:8082" + default: echo upstream: nginx: "127.0.0.1:8080" diff --git a/src/config.rs b/src/config.rs index 01f766c..9a5d10a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,6 +20,7 @@ pub struct BaseConfig { #[derive(Debug, Default, Deserialize, Clone)] pub struct ServerConfig { pub listen: Vec, + pub protocol: Option, pub tls: Option, pub sni: Option>, pub default: Option, @@ -86,7 +87,7 @@ mod tests { let config = Config::new("tests/config.yaml").unwrap(); assert_eq!(config.base.version, 1); assert_eq!(config.base.log.unwrap(), "disable"); - assert_eq!(config.base.servers.len(), 2); - assert_eq!(config.base.upstream.len(), 2); + assert_eq!(config.base.servers.len(), 4); + assert_eq!(config.base.upstream.len(), 3); } } diff --git a/src/servers/mod.rs b/src/servers/mod.rs index 3f83ec4..87189b2 100644 --- a/src/servers/mod.rs +++ b/src/servers/mod.rs @@ -1,16 +1,12 @@ -use futures::future::try_join; -use log::{debug, error, info, warn}; +use log::{error, info}; use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; -use tokio::io; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::net::{TcpListener, TcpStream}; use tokio::task::JoinHandle; -mod tls; -use self::tls::get_sni; +mod protocol; use crate::config::BaseConfig; +use protocol::{kcp, tcp}; #[derive(Debug)] pub struct Server { @@ -22,6 +18,7 @@ pub struct Server { pub struct Proxy { pub name: String, pub listen: SocketAddr, + pub protocol: String, pub tls: bool, pub sni: Option>, pub default: String, @@ -36,6 +33,7 @@ impl Server { }; for (name, proxy) in config.servers.iter() { + let protocol = proxy.protocol.clone().unwrap_or_else(|| "tcp".to_string()); let tls = proxy.tls.unwrap_or(false); let sni = proxy.sni.clone(); let default = proxy.default.clone().unwrap_or_else(|| "ban".to_string()); @@ -48,7 +46,6 @@ impl Server { upstream_set.insert(key.clone()); } for listen in proxy.listen.clone() { - println!("{:?}", listen); let listen_addr: SocketAddr = match listen.parse() { Ok(addr) => addr, Err(_) => { @@ -59,6 +56,7 @@ impl Server { let proxy = Proxy { name: name.clone(), listen: listen_addr, + protocol: protocol.clone(), tls, sni: sni.clone(), default: default.clone(), @@ -77,9 +75,22 @@ impl Server { let mut handles: Vec> = Vec::new(); for config in proxies { - info!("Starting server {} on {}", config.name, config.listen); + info!( + "Starting {} server {} on {}", + config.protocol, config.name, config.listen + ); let handle = tokio::spawn(async move { - let _ = proxy(config).await; + match config.protocol.as_ref() { + "tcp" => { + let _ = tcp::proxy(config).await; + } + "kcp" => { + let _ = kcp::proxy(config).await; + } + _ => { + error!("Invalid protocol: {}", config.protocol) + } + } }); handles.push(handle); } @@ -91,131 +102,48 @@ impl Server { } } -async fn proxy(config: Arc) -> Result<(), Box> { - let listener = TcpListener::bind(config.listen).await?; - let config = config.clone(); - - loop { - let thread_proxy = config.clone(); - match listener.accept().await { - Err(err) => { - error!("Failed to accept connection: {}", err); - return Err(Box::new(err)); - } - Ok((stream, _)) => { - tokio::spawn(async move { - match accept(stream, thread_proxy).await { - Ok(_) => {} - Err(err) => { - error!("Relay thread returned an error: {}", err); - } - }; - }); - } - } - } -} - -async fn accept(inbound: TcpStream, proxy: Arc) -> Result<(), Box> { - debug!("New connection from {:?}", inbound.peer_addr()?); - - let upstream_name = match proxy.tls { - false => proxy.default.clone(), - true => { - let mut hello_buf = [0u8; 1024]; - inbound.peek(&mut hello_buf).await?; - let snis = get_sni(&hello_buf); - if snis.is_empty() { - proxy.default.clone() - } else { - match proxy.sni.clone() { - Some(sni_map) => { - let mut upstream = proxy.default.clone(); - for sni in snis { - let m = sni_map.get(&sni); - if m.is_some() { - upstream = m.unwrap().clone(); - break; - } - } - upstream - } - None => proxy.default.clone(), - } - } - } - }; - - debug!("Upstream: {}", upstream_name); - - let upstream = match proxy.upstream.get(&upstream_name) { - Some(upstream) => upstream, - None => { - warn!( - "No upstream named {:?} on server {:?}", - proxy.default, proxy.name - ); - return process(inbound, &proxy.default).await; - } - }; - return process(inbound, upstream).await; -} - -async fn process(mut inbound: TcpStream, upstream: &str) -> Result<(), Box> { - if upstream == "ban" { - let _ = inbound.shutdown(); - return Ok(()); - } else if upstream == "echo" { - loop { - let mut buf = [0u8; 1]; - let b = inbound.read(&mut buf).await?; - if b == 0 { - break; - } else { - inbound.write(&buf).await?; - } - } - return Ok(()); - } - - let outbound = TcpStream::connect(upstream).await?; - - let (mut ri, mut wi) = io::split(inbound); - let (mut ro, mut wo) = io::split(outbound); - - let inbound_to_outbound = copy(&mut ri, &mut wo); - let outbound_to_inbound = copy(&mut ro, &mut wi); - - let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; - - debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); - - Ok(()) -} - -async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result -where - R: AsyncRead + Unpin + ?Sized, - W: AsyncWrite + Unpin + ?Sized, -{ - match io::copy(reader, writer).await { - Ok(u64) => { - let _ = writer.shutdown().await; - Ok(u64) - } - Err(_) => Ok(0), - } -} - #[cfg(test)] mod test { + use std::net::SocketAddr; use std::thread::{self, sleep}; use std::time::Duration; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::{TcpListener, TcpStream}; + use tokio_kcp::{KcpConfig, KcpStream}; use super::*; + #[tokio::main] + async fn tcp_mock_server() { + let server_addr: SocketAddr = "127.0.0.1:54599".parse().unwrap(); + let listener = TcpListener::bind(server_addr).await.unwrap(); + let (mut stream, _) = listener.accept().await.unwrap(); + stream.write(b"hello").await.unwrap(); + stream.shutdown().await.unwrap(); + } + #[tokio::test] - async fn test_echo_server() { + async fn test_tcp_proxy() { + use crate::config::Config; + let config = Config::new("tests/config.yaml").unwrap(); + let mut server = Server::new(config.base); + thread::spawn(move || { + tcp_mock_server(); + }); + sleep(Duration::from_secs(1)); // wait for server to start + thread::spawn(move || { + let _ = server.run(); + }); + sleep(Duration::from_secs(1)); // wait for server to start + let mut conn = TcpStream::connect("127.0.0.1:54500").await.unwrap(); + let mut buf = [0u8; 5]; + conn.read(&mut buf).await.unwrap(); + assert_eq!(&buf, b"hello"); + conn.shutdown().await.unwrap(); + } + + #[tokio::test] + async fn test_tcp_echo_server() { use crate::config::Config; let config = Config::new("tests/config.yaml").unwrap(); let mut server = Server::new(config.base); @@ -232,4 +160,25 @@ mod test { } conn.shutdown().await.unwrap(); } + + #[tokio::test] + async fn test_kcp_echo_server() { + use crate::config::Config; + let config = Config::new("tests/config.yaml").unwrap(); + let mut server = Server::new(config.base); + thread::spawn(move || { + let _ = server.run(); + }); + sleep(Duration::from_secs(1)); // wait for server to start + let kcp_config = KcpConfig::default(); + let server_addr: SocketAddr = "127.0.0.1:54959".parse().unwrap(); + let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap(); + let mut buf = [0u8; 1]; + for i in 0..=10u8 { + conn.write(&[i]).await.unwrap(); + conn.read(&mut buf).await.unwrap(); + assert_eq!(&buf, &[i]); + } + conn.shutdown().await.unwrap(); + } } diff --git a/src/servers/protocol/kcp.rs b/src/servers/protocol/kcp.rs new file mode 100644 index 0000000..47b769f --- /dev/null +++ b/src/servers/protocol/kcp.rs @@ -0,0 +1,100 @@ +use crate::servers::Proxy; +use futures::future::try_join; +use log::{debug, error, warn}; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::io; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio_kcp::{KcpConfig, KcpListener, KcpStream}; + +pub async fn proxy(config: Arc) -> Result<(), Box> { + let kcp_config = KcpConfig::default(); + let mut listener = KcpListener::bind(kcp_config, config.listen).await?; + let config = config.clone(); + + loop { + let thread_proxy = config.clone(); + match listener.accept().await { + Err(err) => { + error!("Failed to accept connection: {}", err); + return Err(Box::new(err)); + } + Ok((stream, peer)) => { + tokio::spawn(async move { + match accept(stream, peer, thread_proxy).await { + Ok(_) => {} + Err(err) => { + error!("Relay thread returned an error: {}", err); + } + }; + }); + } + } + } +} + +async fn accept( + inbound: KcpStream, + peer: SocketAddr, + proxy: Arc, +) -> Result<(), Box> { + debug!("New connection from {:?}", peer); + + let upstream_name = proxy.default.clone(); + + debug!("Upstream: {}", upstream_name); + + let upstream = match proxy.upstream.get(&upstream_name) { + Some(upstream) => upstream, + None => { + warn!( + "No upstream named {:?} on server {:?}", + proxy.default, proxy.name + ); + return process(inbound, &proxy.default).await; + } + }; + return process(inbound, upstream).await; +} + +async fn process(mut inbound: KcpStream, upstream: &str) -> Result<(), Box> { + if upstream == "ban" { + let _ = inbound.shutdown(); + return Ok(()); + } else if upstream == "echo" { + let (mut ri, mut wi) = io::split(inbound); + let inbound_to_inbound = copy(&mut ri, &mut wi); + let bytes_tx = inbound_to_inbound.await; + debug!("Bytes read: {:?}", bytes_tx); + return Ok(()); + } + + let outbound = TcpStream::connect(upstream).await?; + + let (mut ri, mut wi) = io::split(inbound); + let (mut ro, mut wo) = io::split(outbound); + + let inbound_to_outbound = copy(&mut ri, &mut wo); + let outbound_to_inbound = copy(&mut ro, &mut wi); + + let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; + + debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); + + Ok(()) +} + +async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result +where + R: AsyncRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + match io::copy(reader, writer).await { + Ok(u64) => { + let _ = writer.shutdown().await; + Ok(u64) + } + Err(_) => Ok(0), + } +} diff --git a/src/servers/protocol/mod.rs b/src/servers/protocol/mod.rs new file mode 100644 index 0000000..014642a --- /dev/null +++ b/src/servers/protocol/mod.rs @@ -0,0 +1,3 @@ +pub mod kcp; +pub mod tcp; +pub mod tls; diff --git a/src/servers/protocol/tcp.rs b/src/servers/protocol/tcp.rs new file mode 100644 index 0000000..68d7a00 --- /dev/null +++ b/src/servers/protocol/tcp.rs @@ -0,0 +1,119 @@ +use crate::servers::protocol::tls::get_sni; +use crate::servers::Proxy; +use futures::future::try_join; +use log::{debug, error, warn}; +use std::sync::Arc; +use tokio::io; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; + +pub async fn proxy(config: Arc) -> Result<(), Box> { + let listener = TcpListener::bind(config.listen).await?; + let config = config.clone(); + + loop { + let thread_proxy = config.clone(); + match listener.accept().await { + Err(err) => { + error!("Failed to accept connection: {}", err); + return Err(Box::new(err)); + } + Ok((stream, _)) => { + tokio::spawn(async move { + match accept(stream, thread_proxy).await { + Ok(_) => {} + Err(err) => { + error!("Relay thread returned an error: {}", err); + } + }; + }); + } + } + } +} + +async fn accept(inbound: TcpStream, proxy: Arc) -> Result<(), Box> { + debug!("New connection from {:?}", inbound.peer_addr()?); + + let upstream_name = match proxy.tls { + false => proxy.default.clone(), + true => { + let mut hello_buf = [0u8; 1024]; + inbound.peek(&mut hello_buf).await?; + let snis = get_sni(&hello_buf); + if snis.is_empty() { + proxy.default.clone() + } else { + match proxy.sni.clone() { + Some(sni_map) => { + let mut upstream = proxy.default.clone(); + for sni in snis { + let m = sni_map.get(&sni); + if m.is_some() { + upstream = m.unwrap().clone(); + break; + } + } + upstream + } + None => proxy.default.clone(), + } + } + } + }; + + debug!("Upstream: {}", upstream_name); + + let upstream = match proxy.upstream.get(&upstream_name) { + Some(upstream) => upstream, + None => { + warn!( + "No upstream named {:?} on server {:?}", + proxy.default, proxy.name + ); + return process(inbound, &proxy.default).await; + } + }; + return process(inbound, upstream).await; +} + +async fn process(mut inbound: TcpStream, upstream: &str) -> Result<(), Box> { + if upstream == "ban" { + let _ = inbound.shutdown(); + return Ok(()); + } else if upstream == "echo" { + let (mut ri, mut wi) = io::split(inbound); + let inbound_to_inbound = copy(&mut ri, &mut wi); + let bytes_tx = inbound_to_inbound.await; + debug!("Bytes read: {:?}", bytes_tx); + return Ok(()); + } + + let outbound = TcpStream::connect(upstream).await?; + + let (mut ri, mut wi) = io::split(inbound); + let (mut ro, mut wo) = io::split(outbound); + + let inbound_to_outbound = copy(&mut ri, &mut wo); + let outbound_to_inbound = copy(&mut ro, &mut wi); + + let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; + + debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); + + Ok(()) +} + +async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result +where + R: AsyncRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + match io::copy(reader, writer).await { + Ok(u64) => { + let _ = writer.shutdown().await; + Ok(u64) + } + Err(_) => Ok(0), + } +} diff --git a/src/servers/tls.rs b/src/servers/protocol/tls.rs similarity index 100% rename from src/servers/tls.rs rename to src/servers/protocol/tls.rs diff --git a/tests/config.yaml b/tests/config.yaml index 08709a6..b906374 100644 --- a/tests/config.yaml +++ b/tests/config.yaml @@ -15,7 +15,17 @@ servers: listen: - "0.0.0.0:54956" default: echo + tcp_server: + listen: + - "127.0.0.1:54500" + default: tester + kcp_server: + protocol: kcp + listen: + - "127.0.0.1:54959" + default: echo upstream: web: "127.0.0.1:8080" - proxy: "www.example.com:1024" \ No newline at end of file + proxy: "www.example.com:1024" + tester: "127.0.0.1:54599" \ No newline at end of file