From 47be2568ba76c43f22bb05e553710baf9f6f5a10 Mon Sep 17 00:00:00 2001 From: KernelErr <45716019+KernelErr@users.noreply.github.com> Date: Sun, 31 Oct 2021 19:21:32 +0800 Subject: [PATCH] Add upstream scheme support Need to implement TCP and UDP upstream support. --- Cargo.lock | 78 ++++++++++++++++++++++++++++- Cargo.toml | 3 +- README.md | 30 +++++------ example-config.yaml | 6 +-- src/config.rs | 99 ++++++++++++++++++++++++++++++++++--- src/servers/mod.rs | 11 +++-- src/servers/protocol/kcp.rs | 55 +++++++++++---------- src/servers/protocol/tcp.rs | 55 +++++++++++---------- tests/config.yaml | 6 +-- 9 files changed, 254 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4428f9..ebfc972 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,9 +96,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "form_urlencoded" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +dependencies = [ + "matches", + "percent-encoding", +] + [[package]] name = "fourth" -version = "0.1.3" +version = "0.1.4" dependencies = [ "byte_string", "bytes 1.1.0", @@ -110,6 +120,7 @@ dependencies = [ "serde_yaml", "tls-parser", "tokio", + "url", ] [[package]] @@ -241,6 +252,17 @@ dependencies = [ "quick-error", ] +[[package]] +name = "idna" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "1.7.0" @@ -318,6 +340,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matches" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" + [[package]] name = "memchr" version = "2.4.1" @@ -453,6 +481,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + [[package]] name = "phf" version = "0.10.0" @@ -721,6 +755,21 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "tinyvec" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83b2a3d4d9091d0abd7eba4dc2710b1718583bd4d8992e2190720ea38f391f7" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" + [[package]] name = "tls-parser" version = "0.11.0" @@ -766,12 +815,39 @@ dependencies = [ "syn", ] +[[package]] +name = "unicode-bidi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" + +[[package]] +name = "unicode-normalization" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-xid" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "url" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" +dependencies = [ + "form_urlencoded", + "idna", + "matches", + "percent-encoding", +] + [[package]] name = "version_check" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index 84db5e2..83920ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fourth" -version = "0.1.3" +version = "0.1.4" edition = "2021" authors = ["LI Rui "] license = "Apache-2.0" @@ -22,6 +22,7 @@ serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.8" futures = "0.3" tls-parser = "0.11" +url = "2.2.2" tokio = { version = "1.0", features = ["full"] } diff --git a/README.md b/README.md index e1030c3..9c9e9c2 100644 --- a/README.md +++ b/README.md @@ -35,39 +35,31 @@ $ cargo install fourth ## 配置 -Fourth使用yaml格式的配置文件,默认情况下会读取`/etc/fourth/config.yaml`,如下是一个示例配置。 +Fourth使用yaml格式的配置文件,默认情况下会读取`/etc/fourth/config.yaml`,如下是一个最小有效配置: ```yaml version: 1 log: info servers: - example_server: - listen: - - "0.0.0.0:443" - - "[::]:443" - tls: true # Enable TLS features like SNI filtering - sni: - proxy.example.com: proxy - www.example.com: nginx - default: ban proxy_server: listen: - "127.0.0.1:8081" default: remote - kcp_server: - protocol: kcp # default TCP - listen: - - "127.0.0.1:8082" - default: echo upstream: - nginx: "127.0.0.1:8080" - proxy: "127.0.0.1:1024" - remote: "www.remote.example.com:8082" # proxy to remote address + remote: "tcp://www.remote.example.com:8082" # proxy to remote address ``` -内置两个的upstream:ban(立即中断连接)、echo(返回读到的数据)。 +内置两个的upstream:ban(立即中断连接)、echo(返回读到的数据)。更详细的配置可以参考[示例配置](./example-config.yaml)。 + +## 性能测试 + +在4C2G的服务器上测试: + +使用Fourth代理到Nginx(直连QPS 120000): ~70000req/s (测试命令:`wrk -t200 -c1000 -d120s --latency http://proxy-server:8081 `) + +使用Fourth代理到本地iperf3:8Gbps ## io_uring? diff --git a/example-config.yaml b/example-config.yaml index c23dd68..edf5747 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -22,6 +22,6 @@ servers: default: echo upstream: - nginx: "127.0.0.1:8080" - proxy: "127.0.0.1:1024" - remote: "www.remote.example.com:8082" # proxy to remote address \ No newline at end of file + nginx: "tcp://127.0.0.1:8080" + proxy: "tcp://127.0.0.1:1024" + remote: "tcp://www.remote.example.com:8082" # proxy to remote address \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index ba2424d..5e21a29 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,10 +3,19 @@ use serde::Deserialize; use std::collections::HashMap; use std::fs::File; use std::io::{Error as IOError, Read}; +use url::Url; #[derive(Debug, Clone)] pub struct Config { - pub base: BaseConfig, + pub base: ParsedConfig, +} + +#[derive(Debug, Default, Deserialize, Clone)] +pub struct ParsedConfig { + pub version: i32, + pub log: Option, + pub servers: HashMap, + pub upstream: HashMap, } #[derive(Debug, Default, Deserialize, Clone)] @@ -26,6 +35,20 @@ pub struct ServerConfig { pub default: Option, } +#[derive(Debug, Clone, Deserialize)] +pub enum Upstream { + Ban, + Echo, + Custom(CustomUpstream), +} + +#[derive(Debug, Clone, Deserialize)] +pub struct CustomUpstream { + pub name: String, + pub addr: String, + pub protocol: String, +} + #[derive(Debug)] pub enum ConfigError { IO(IOError), @@ -41,27 +64,89 @@ impl Config { } } -fn load_config(path: &str) -> Result { +fn load_config(path: &str) -> Result { let mut contents = String::new(); let mut file = (File::open(path))?; (file.read_to_string(&mut contents))?; - let parsed: BaseConfig = serde_yaml::from_str(&contents)?; + let base: BaseConfig = serde_yaml::from_str(&contents)?; - if parsed.version != 1 { + if base.version != 1 { return Err(ConfigError::Custom( "Unsupported config version".to_string(), )); } - let log_level = parsed.log.clone().unwrap_or_else(|| "info".to_string()); + let log_level = base.log.clone().unwrap_or_else(|| "info".to_string()); if !log_level.eq("disable") { std::env::set_var("FOURTH_LOG", log_level.clone()); pretty_env_logger::init_custom_env("FOURTH_LOG"); debug!("Set log level to {}", log_level); } - debug!("Config version {}", parsed.version); + debug!("Config version {}", base.version); + + let mut parsed_upstream: HashMap = HashMap::new(); + + for (name, upstream) in base.upstream.iter() { + let upstream_url = match Url::parse(upstream) { + Ok(url) => url, + Err(_) => { + return Err(ConfigError::Custom(format!( + "Invalid upstream url \"{}\"", + upstream + ))) + } + }; + + let upstream_host = match upstream_url.host_str() { + Some(host) => host, + None => { + return Err(ConfigError::Custom(format!( + "Invalid upstream url \"{}\"", + upstream + ))) + } + }; + + let upsteam_port = match upstream_url.port_or_known_default() { + Some(port) => port, + None => { + return Err(ConfigError::Custom(format!( + "Invalid upstream url \"{}\"", + upstream + ))) + } + }; + + parsed_upstream.insert( + name.to_string(), + Upstream::Custom(CustomUpstream { + name: name.to_string(), + addr: format!("{}:{}", upstream_host, upsteam_port), + protocol: upstream_url.scheme().to_string(), + }), + ); + } + + parsed_upstream.insert( + "ban".to_string(), + Upstream::Ban, + ); + + parsed_upstream.insert( + "echo".to_string(), + Upstream::Echo, + ); + + let parsed = ParsedConfig { + version: base.version, + log: base.log, + servers: base.servers, + upstream: parsed_upstream, + }; + + // ToDo: validate config Ok(parsed) } @@ -88,6 +173,6 @@ mod tests { assert_eq!(config.base.version, 1); assert_eq!(config.base.log.unwrap(), "disable"); assert_eq!(config.base.servers.len(), 5); - assert_eq!(config.base.upstream.len(), 3); + assert_eq!(config.base.upstream.len(), 3 + 2); // Add ban and echo upstreams } } diff --git a/src/servers/mod.rs b/src/servers/mod.rs index 39a6e4e..49c8cfe 100644 --- a/src/servers/mod.rs +++ b/src/servers/mod.rs @@ -5,13 +5,13 @@ use std::sync::Arc; use tokio::task::JoinHandle; mod protocol; -use crate::config::BaseConfig; +use crate::config::{ParsedConfig, Upstream}; use protocol::{kcp, tcp}; #[derive(Debug)] pub struct Server { pub proxies: Vec>, - pub config: BaseConfig, + pub config: ParsedConfig, } #[derive(Debug, Clone)] @@ -22,11 +22,11 @@ pub struct Proxy { pub tls: bool, pub sni: Option>, pub default: String, - pub upstream: HashMap, + pub upstream: HashMap, } impl Server { - pub fn new(config: BaseConfig) -> Self { + pub fn new(config: ParsedConfig) -> Self { let mut new_server = Server { proxies: Vec::new(), config: config.clone(), @@ -53,6 +53,7 @@ impl Server { continue; } }; + let proxy = Proxy { name: name.clone(), listen: listen_addr, @@ -103,7 +104,7 @@ impl Server { } #[cfg(test)] -mod test { +mod tests { use crate::plugins::kcp::{KcpConfig, KcpStream}; use std::net::SocketAddr; use std::thread::{self, sleep}; diff --git a/src/servers/protocol/kcp.rs b/src/servers/protocol/kcp.rs index cd3e0a8..195001f 100644 --- a/src/servers/protocol/kcp.rs +++ b/src/servers/protocol/kcp.rs @@ -1,3 +1,4 @@ +use crate::config::Upstream; use crate::plugins::kcp::{KcpConfig, KcpListener, KcpStream}; use crate::servers::Proxy; use futures::future::try_join; @@ -52,37 +53,41 @@ async fn accept( "No upstream named {:?} on server {:?}", proxy.default, proxy.name ); - return process(inbound, &proxy.default).await; + return process(inbound, proxy.upstream.get(&proxy.default).unwrap()).await; // ToDo: Remove unwrap and check default option } }; return process(inbound, upstream).await; } -async fn process(mut inbound: KcpStream, upstream: &str) -> Result<(), Box> { - if upstream == "ban" { - let _ = inbound.shutdown(); - return Ok(()); - } else if upstream == "echo" { - let (mut ri, mut wi) = io::split(inbound); - let inbound_to_inbound = copy(&mut ri, &mut wi); - let bytes_tx = inbound_to_inbound.await; - debug!("Bytes read: {:?}", bytes_tx); - return Ok(()); +async fn process(mut inbound: KcpStream, upstream: &Upstream) -> Result<(), Box> { + match upstream { + Upstream::Ban => { + let _ = inbound.shutdown(); + Ok(()) + } + Upstream::Echo => { + let (mut ri, mut wi) = io::split(inbound); + let inbound_to_inbound = copy(&mut ri, &mut wi); + let bytes_tx = inbound_to_inbound.await; + debug!("Bytes read: {:?}", bytes_tx); + Ok(()) + } + Upstream::Custom(custom) => { + let outbound = TcpStream::connect(custom.addr.clone()).await?; + + let (mut ri, mut wi) = io::split(inbound); + let (mut ro, mut wo) = io::split(outbound); + + let inbound_to_outbound = copy(&mut ri, &mut wo); + let outbound_to_inbound = copy(&mut ro, &mut wi); + + let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; + + debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); + + Ok(()) + } } - - let outbound = TcpStream::connect(upstream).await?; - - let (mut ri, mut wi) = io::split(inbound); - let (mut ro, mut wo) = io::split(outbound); - - let inbound_to_outbound = copy(&mut ri, &mut wo); - let outbound_to_inbound = copy(&mut ro, &mut wi); - - let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; - - debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); - - Ok(()) } async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result diff --git a/src/servers/protocol/tcp.rs b/src/servers/protocol/tcp.rs index 68d7a00..4988b56 100644 --- a/src/servers/protocol/tcp.rs +++ b/src/servers/protocol/tcp.rs @@ -1,3 +1,4 @@ +use crate::config::Upstream; use crate::servers::protocol::tls::get_sni; use crate::servers::Proxy; use futures::future::try_join; @@ -71,37 +72,41 @@ async fn accept(inbound: TcpStream, proxy: Arc) -> Result<(), Box Result<(), Box> { - if upstream == "ban" { - let _ = inbound.shutdown(); - return Ok(()); - } else if upstream == "echo" { - let (mut ri, mut wi) = io::split(inbound); - let inbound_to_inbound = copy(&mut ri, &mut wi); - let bytes_tx = inbound_to_inbound.await; - debug!("Bytes read: {:?}", bytes_tx); - return Ok(()); +async fn process(mut inbound: TcpStream, upstream: &Upstream) -> Result<(), Box> { + match upstream { + Upstream::Ban => { + let _ = inbound.shutdown(); + Ok(()) + } + Upstream::Echo => { + let (mut ri, mut wi) = io::split(inbound); + let inbound_to_inbound = copy(&mut ri, &mut wi); + let bytes_tx = inbound_to_inbound.await; + debug!("Bytes read: {:?}", bytes_tx); + Ok(()) + } + Upstream::Custom(custom) => { + let outbound = TcpStream::connect(custom.addr.clone()).await?; + + let (mut ri, mut wi) = io::split(inbound); + let (mut ro, mut wo) = io::split(outbound); + + let inbound_to_outbound = copy(&mut ri, &mut wo); + let outbound_to_inbound = copy(&mut ro, &mut wi); + + let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; + + debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); + + Ok(()) + } } - - let outbound = TcpStream::connect(upstream).await?; - - let (mut ri, mut wi) = io::split(inbound); - let (mut ro, mut wo) = io::split(outbound); - - let inbound_to_outbound = copy(&mut ri, &mut wo); - let outbound_to_inbound = copy(&mut ro, &mut wi); - - let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; - - debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); - - Ok(()) } async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result diff --git a/tests/config.yaml b/tests/config.yaml index fb3f1cd..42a1929 100644 --- a/tests/config.yaml +++ b/tests/config.yaml @@ -31,6 +31,6 @@ servers: default: echo upstream: - web: "127.0.0.1:8080" - proxy: "www.example.com:1024" - tester: "127.0.0.1:54599" \ No newline at end of file + web: "tcp://127.0.0.1:8080" + proxy: "tcp://www.example.com:1024" + tester: "tcp://127.0.0.1:54599" \ No newline at end of file