13 Commits

Author SHA1 Message Date
f4bc441ca8 Enable explicit ipv4 / ipv6 proxying
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-06-02 17:35:29 +02:00
f010f8c76b Update dependencies
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-06-02 17:35:13 +02:00
KernelErr
8fbc0c370a Add error messages when failed to start server 2021-12-30 22:05:25 +08:00
KernelErr
bff92738d5 Allow config path from FOURTH_CONFIG 2021-11-01 16:06:47 +08:00
KernelErr
754a5af794 Add publish CI and run fmt 2021-11-01 15:56:57 +08:00
KernelErr
fc7a3038bd Add unknown protocol error 2021-11-01 15:32:08 +08:00
KernelErr
8a96de9666 Update README and minor refactor 2021-11-01 15:25:12 +08:00
KernelErr
0407f4b40c Add config validation 2021-11-01 13:45:47 +08:00
KernelErr
47be2568ba Add upstream scheme support
Need to implement TCP and UDP upstream support.
2021-10-31 19:21:32 +08:00
KernelErr
5944beb6a2 Combine TCP and KCP tests 2021-10-27 08:36:24 +08:00
KernelErr
4363e3f76a Publish 0.1.3 and update README 2021-10-26 23:58:00 +08:00
KernelErr
ee9d0685b3 Refactor TCP and KCP test 2021-10-26 23:52:07 +08:00
KernelErr
421ad8c979 Fix example config 2021-10-26 23:27:03 +08:00
15 changed files with 889 additions and 415 deletions

39
.github/workflows/publish-binaries.yml vendored Normal file
View File

@@ -0,0 +1,39 @@
on:
release:
types: [published]
name: Publish binaries to release
jobs:
publish:
name: Publish for ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
include:
- os: ubuntu-latest
artifact_name: fourth
asset_name: fourth-linux-amd64
- os: macos-latest
artifact_name: fourth
asset_name: fourth-macos-amd64
- os: windows-latest
artifact_name: fourth.exe
asset_name: fourth-windows-amd64.exe
steps:
- uses: hecrj/setup-rust-action@master
with:
rust-version: stable
- uses: actions/checkout@v2
- name: Build
run: cargo build --release --locked
- name: Publish
uses: svenstaro/upload-release-action@v1-release
with:
repo_token: ${{ secrets.PUBLISH_TOKEN }}
file: target/release/${{ matrix.artifact_name }}
asset_name: ${{ matrix.asset_name }}
tag: ${{ github.ref }}

670
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "fourth" name = "fourth"
version = "0.1.2" version = "0.1.5"
edition = "2021" edition = "2021"
authors = ["LI Rui <lr_cn@outlook.com>"] authors = ["LI Rui <lr_cn@outlook.com>"]
license = "Apache-2.0" license = "Apache-2.0"
@@ -19,12 +19,13 @@ exclude = [".*"]
log = "0.4" log = "0.4"
pretty_env_logger = "0.4" pretty_env_logger = "0.4"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8" serde_yaml = "0.9.21"
futures = "0.3" futures = "0.3"
tls-parser = "0.11" tls-parser = "0.11"
url = "2.2.2"
time = { version = "0.3.1", features = ["local-offset", "formatting"] }
tokio = { version = "1.0", features = ["full"] } tokio = { version = "1.0", features = ["full"] }
bytes = "1.1" bytes = "1.1"
kcp = "0.4"
byte_string = "1" byte_string = "1"

View File

@@ -4,6 +4,8 @@
[![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml) [![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml)
**Under heavy development, version 0.1 may update frequently**
Fourth is a layer 4 proxy implemented by Rust to listen on specific ports and transfer TCP/KCP data to remote addresses(only TCP) according to configuration. Fourth is a layer 4 proxy implemented by Rust to listen on specific ports and transfer TCP/KCP data to remote addresses(only TCP) according to configuration.
## Features ## Features
@@ -29,36 +31,35 @@ Or you can use Cargo to install Fourth:
$ cargo install fourth $ cargo install fourth
``` ```
Or you can download binary file form the Release page.
## Configuration ## Configuration
Fourth will read yaml format configuration file from `/etc/fourth/config.yaml`, here is an example: Fourth will read yaml format configuration file from `/etc/fourth/config.yaml`, and you can set custom path to environment variable `FOURTH_CONFIG`, here is an minimal viable example:
```yaml ```yaml
version: 1 version: 1
log: info log: info
servers: servers:
example_server: proxy_server:
listen:
- "0.0.0.0:443"
- "[::]:443"
tls: true # Enable TLS features like SNI
sni:
proxy.example.com: proxy
www.example.com: nginx
default: ban
relay_server:
listen: listen:
- "127.0.0.1:8081" - "127.0.0.1:8081"
default: remote default: remote
upstream: upstream:
nginx: "127.0.0.1:8080" remote: "tcp://www.remote.example.com:8082" # proxy to remote address
proxy: "127.0.0.1:1024"
other: "www.remote.example.com:8082" # proxy to remote address
``` ```
Built-in two upstreams: ban(terminate connection immediately), echo Built-in two upstreams: ban(terminate connection immediately), echo. For detailed configuration, check [this example](./example-config.yaml).
## Performance Benchmark
Tested on 4C2G server:
Use fourth to proxy to Nginx(QPS of direct connection: ~120000): ~70000 req/s (Command: `wrk -t200 -c1000 -d120s --latency http://proxy-server:8081`)
Use fourth to proxy to local iperf3: 8Gbps
## Thanks ## Thanks

View File

@@ -6,6 +6,8 @@
[English](/README-EN.md) [English](/README-EN.md)
**积极开发中0.1版本迭代可能较快**
Fourth是一个Rust实现的Layer 4代理用于监听指定端口TCP/KCP流量并根据规则转发到指定目标目前只支持TCP Fourth是一个Rust实现的Layer 4代理用于监听指定端口TCP/KCP流量并根据规则转发到指定目标目前只支持TCP
## 功能 ## 功能
@@ -31,41 +33,37 @@ $ cargo build --release
$ cargo install fourth $ cargo install fourth
``` ```
或者您也可以直接从Release中下载二进制文件。
## 配置 ## 配置
Fourth使用yaml格式的配置文件默认情况下会读取`/etc/fourth/config.yaml`,如下是一个示例配置 Fourth使用yaml格式的配置文件默认情况下会读取`/etc/fourth/config.yaml`您也可以设置自定义路径到环境变量`FOURTH_CONFIG`如下是一个最小有效配置
```yaml ```yaml
version: 1 version: 1
log: info log: info
servers: servers:
example_server:
listen:
- "0.0.0.0:443"
- "[::]:443"
tls: true # Enable TLS features like SNI filtering
sni:
proxy.example.com: proxy
www.example.com: nginx
default: ban
proxy_server: proxy_server:
listen: listen:
- "127.0.0.1:8081" - "127.0.0.1:8081"
default: remote default: remote
kcp_server:
protocol: kcp # default TCP
listen:
- "127.0.0.1:8082"
default: echo
upstream: upstream:
nginx: "127.0.0.1:8080" remote: "tcp://www.remote.example.com:8082" # proxy to remote address
proxy: "127.0.0.1:1024"
other: "www.remote.example.com:8082" # proxy to remote address
``` ```
内置两个的upstreamban立即中断连接、echo返回读到的数据 内置两个的upstreamban立即中断连接、echo返回读到的数据更详细的配置可以参考[示例配置](./example-config.yaml)。
注意:[::]会默认同时绑定IPv4和IPv6。
## 性能测试
在4C2G的服务器上测试
使用Fourth代理到Nginx直连QPS 120000: ~70000req/s (测试命令:`wrk -t200 -c1000 -d120s --latency http://proxy-server:8081 `
使用Fourth代理到本地iperf38Gbps
## io_uring? ## io_uring?

View File

@@ -22,6 +22,6 @@ servers:
default: echo default: echo
upstream: upstream:
nginx: "127.0.0.1:8080" nginx: "tcp://127.0.0.1:8080"
proxy: "127.0.0.1:1024" proxy: "tcp://127.0.0.1:1024"
other: "www.remote.example.com:8082" # proxy to remote address remote: "tcp://www.remote.example.com:8082" # proxy to remote address

View File

@@ -1,12 +1,25 @@
use log::debug; use log::{debug, warn};
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::fs::File; use std::fs::File;
use std::io::{Error as IOError, Read}; use std::io::{Error as IOError, Read};
use std::net::SocketAddr;
use tokio::sync::Mutex;
use url::Url;
use tokio::time::Instant;
use time::OffsetDateTime;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Config { pub struct Config {
pub base: BaseConfig, pub base: ParsedConfig,
}
#[derive(Debug, Default, Deserialize, Clone)]
pub struct ParsedConfig {
pub version: i32,
pub log: Option<String>,
pub servers: HashMap<String, ServerConfig>,
pub upstream: HashMap<String, Upstream>,
} }
#[derive(Debug, Default, Deserialize, Clone)] #[derive(Debug, Default, Deserialize, Clone)]
@@ -26,6 +39,84 @@ pub struct ServerConfig {
pub default: Option<String>, pub default: Option<String>,
} }
#[derive(Debug, Clone, Deserialize)]
pub enum Upstream {
Ban,
Echo,
Custom(CustomUpstream),
}
#[derive(Debug)]
struct Addr(Mutex<Vec<SocketAddr>>);
impl Default for Addr {
fn default() -> Self {
Self(Default::default())
}
}
impl Clone for Addr {
fn clone(&self) -> Self {
tokio::task::block_in_place(|| Self(Mutex::new(self.0.blocking_lock().clone())))
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct CustomUpstream {
pub name: String,
pub addr: String,
pub protocol: String,
#[serde(skip_deserializing)]
addresses: Addr,
}
impl CustomUpstream {
pub async fn resolve_addresses(&self) -> std::io::Result<()> {
{
let addr = self.addresses.0.lock().await;
if addr.len() > 0 {
debug!("Already have addresses: {:?}", &addr);
return Ok(());
}
}
debug!("Resolving addresses for {}", &self.addr);
let addresses = tokio::net::lookup_host(self.addr.clone()).await?;
let mut addr: Vec<SocketAddr> = match self.protocol.as_ref() {
"tcp4" => addresses.into_iter().filter(|a| a.is_ipv4()).collect(),
"tcp6" => addresses.into_iter().filter(|a| a.is_ipv6()).collect(),
_ => addresses.collect(),
};
debug!("Got addresses for {}: {:?}", &self.addr, &addr);
debug!("Resolved at {}", OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).expect("Format"));
{
let mut self_addr = self.addresses.0.lock().await;
self_addr.clear();
self_addr.append(&mut addr);
}
Ok(())
}
pub async fn get_addresses(&self) -> Vec<SocketAddr> {
let a = self.addresses.0.lock().await;
a.clone()
}
}
impl Default for CustomUpstream {
fn default() -> Self {
Self {
name: Default::default(),
addr: Default::default(),
protocol: Default::default(),
addresses: Default::default(),
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum ConfigError { pub enum ConfigError {
IO(IOError), IO(IOError),
@@ -41,29 +132,150 @@ impl Config {
} }
} }
fn load_config(path: &str) -> Result<BaseConfig, ConfigError> { fn load_config(path: &str) -> Result<ParsedConfig, ConfigError> {
let mut contents = String::new(); let mut contents = String::new();
let mut file = (File::open(path))?; let mut file = (File::open(path))?;
(file.read_to_string(&mut contents))?; (file.read_to_string(&mut contents))?;
let parsed: BaseConfig = serde_yaml::from_str(&contents)?; let base: BaseConfig = serde_yaml::from_str(&contents)?;
if parsed.version != 1 { if base.version != 1 {
return Err(ConfigError::Custom( return Err(ConfigError::Custom(
"Unsupported config version".to_string(), "Unsupported config version".to_string(),
)); ));
} }
let log_level = parsed.log.clone().unwrap_or_else(|| "info".to_string()); let log_level = base.log.clone().unwrap_or_else(|| "info".to_string());
if !log_level.eq("disable") { if !log_level.eq("disable") {
std::env::set_var("FOURTH_LOG", log_level.clone()); std::env::set_var("FOURTH_LOG", log_level.clone());
pretty_env_logger::init_custom_env("FOURTH_LOG"); pretty_env_logger::init_custom_env("FOURTH_LOG");
debug!("Set log level to {}", log_level); debug!("Set log level to {}", log_level);
} }
debug!("Config version {}", parsed.version); debug!("Config version {}", base.version);
Ok(parsed) let mut parsed_upstream: HashMap<String, Upstream> = HashMap::new();
for (name, upstream) in base.upstream.iter() {
let upstream_url = match Url::parse(upstream) {
Ok(url) => url,
Err(_) => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
upstream
)))
}
};
let upstream_host = match upstream_url.host_str() {
Some(host) => host,
None => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
upstream
)))
}
};
let upsteam_port = match upstream_url.port_or_known_default() {
Some(port) => port,
None => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
upstream
)))
}
};
match upstream_url.scheme() {
"tcp" | "tcp4" | "tcp6" => {}
_ => {
return Err(ConfigError::Custom(format!(
"Invalid upstream scheme {}",
upstream
)))
}
}
parsed_upstream.insert(
name.to_string(),
Upstream::Custom(CustomUpstream {
name: name.to_string(),
addr: format!("{}:{}", upstream_host, upsteam_port),
protocol: upstream_url.scheme().to_string(),
..Default::default()
}),
);
}
parsed_upstream.insert("ban".to_string(), Upstream::Ban);
parsed_upstream.insert("echo".to_string(), Upstream::Echo);
let parsed = ParsedConfig {
version: base.version,
log: base.log,
servers: base.servers,
upstream: parsed_upstream,
};
verify_config(parsed)
}
fn verify_config(config: ParsedConfig) -> Result<ParsedConfig, ConfigError> {
let mut used_upstreams: HashSet<String> = HashSet::new();
let mut upstream_names: HashSet<String> = HashSet::new();
let mut listen_addresses: HashSet<String> = HashSet::new();
// Check for duplicate upstream names
for (name, _) in config.upstream.iter() {
if upstream_names.contains(name) {
return Err(ConfigError::Custom(format!(
"Duplicate upstream name {}",
name
)));
}
upstream_names.insert(name.to_string());
}
for (_, server) in config.servers.clone() {
// check for duplicate listen addresses
for listen in server.listen {
if listen_addresses.contains(&listen) {
return Err(ConfigError::Custom(format!(
"Duplicate listen address {}",
listen
)));
}
listen_addresses.insert(listen.to_string());
}
if server.tls.unwrap_or_default() && server.sni.is_some() {
for (_, val) in server.sni.unwrap() {
used_upstreams.insert(val.to_string());
}
}
if server.default.is_some() {
used_upstreams.insert(server.default.unwrap().to_string());
}
for key in &used_upstreams {
if !config.upstream.contains_key(key) {
return Err(ConfigError::Custom(format!("Upstream {} not found", key)));
}
}
}
for key in &upstream_names {
if !used_upstreams.contains(key) && !key.eq("echo") && !key.eq("ban") {
warn!("Upstream {} not used", key);
}
}
Ok(config)
} }
impl From<IOError> for ConfigError { impl From<IOError> for ConfigError {
@@ -87,7 +299,7 @@ mod tests {
let config = Config::new("tests/config.yaml").unwrap(); let config = Config::new("tests/config.yaml").unwrap();
assert_eq!(config.base.version, 1); assert_eq!(config.base.version, 1);
assert_eq!(config.base.log.unwrap(), "disable"); assert_eq!(config.base.log.unwrap(), "disable");
assert_eq!(config.base.servers.len(), 4); assert_eq!(config.base.servers.len(), 5);
assert_eq!(config.base.upstream.len(), 3); assert_eq!(config.base.upstream.len(), 3 + 2); // Add ban and echo upstreams
} }
} }

View File

@@ -6,9 +6,13 @@ use crate::config::Config;
use crate::servers::Server; use crate::servers::Server;
use log::{debug, error}; use log::{debug, error};
use std::env;
fn main() { fn main() {
let config = match Config::new("/etc/fourth/config.yaml") { let config_path =
env::var("FOURTH_CONFIG").unwrap_or_else(|_| "/etc/fourth/config.yaml".to_string());
let config = match Config::new(&config_path) {
Ok(config) => config, Ok(config) => config,
Err(e) => { Err(e) => {
println!("Could not load config: {:?}", e); println!("Could not load config: {:?}", e);
@@ -20,6 +24,6 @@ fn main() {
let mut server = Server::new(config.base); let mut server = Server::new(config.base);
debug!("{:?}", server); debug!("{:?}", server);
let res = server.run(); let _ = server.run();
error!("Server returned an error: {:?}", res); error!("Server ended with errors");
} }

View File

@@ -1 +1 @@
pub mod kcp; //pub mod kcp;

View File

@@ -5,13 +5,13 @@ use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
mod protocol; mod protocol;
use crate::config::BaseConfig; use crate::config::{ParsedConfig, Upstream};
use protocol::{kcp, tcp}; use protocol::tcp;
#[derive(Debug)] #[derive(Debug)]
pub struct Server { pub struct Server {
pub proxies: Vec<Arc<Proxy>>, pub proxies: Vec<Arc<Proxy>>,
pub config: BaseConfig, pub config: ParsedConfig,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -22,11 +22,11 @@ pub struct Proxy {
pub tls: bool, pub tls: bool,
pub sni: Option<HashMap<String, String>>, pub sni: Option<HashMap<String, String>>,
pub default: String, pub default: String,
pub upstream: HashMap<String, String>, pub upstream: HashMap<String, Upstream>,
} }
impl Server { impl Server {
pub fn new(config: BaseConfig) -> Self { pub fn new(config: ParsedConfig) -> Self {
let mut new_server = Server { let mut new_server = Server {
proxies: Vec::new(), proxies: Vec::new(),
config: config.clone(), config: config.clone(),
@@ -53,6 +53,7 @@ impl Server {
continue; continue;
} }
}; };
let proxy = Proxy { let proxy = Proxy {
name: name.clone(), name: name.clone(),
listen: listen_addr, listen: listen_addr,
@@ -82,11 +83,29 @@ impl Server {
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
match config.protocol.as_ref() { match config.protocol.as_ref() {
"tcp" => { "tcp" => {
let _ = tcp::proxy(config).await; let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
} }
"kcp" => {
let _ = kcp::proxy(config).await;
} }
"tcp4" => {
let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
}
}
"tcp6" => {
let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
}
}
// "kcp" => {
// let res = kcp::proxy(config.clone()).await;
// if res.is_err() {
// error!("Failed to start {}: {}", config.name, res.err().unwrap());
// }
// }
_ => { _ => {
error!("Invalid protocol: {}", config.protocol) error!("Invalid protocol: {}", config.protocol)
} }
@@ -103,13 +122,12 @@ impl Server {
} }
#[cfg(test)] #[cfg(test)]
mod test { mod tests {
use crate::plugins::kcp::{KcpConfig, KcpStream}; //use crate::plugins::kcp::{KcpConfig, KcpStream};
use std::net::SocketAddr;
use std::thread::{self, sleep}; use std::thread::{self, sleep};
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::TcpListener;
use super::*; use super::*;
@@ -117,13 +135,24 @@ mod test {
async fn tcp_mock_server() { async fn tcp_mock_server() {
let server_addr: SocketAddr = "127.0.0.1:54599".parse().unwrap(); let server_addr: SocketAddr = "127.0.0.1:54599".parse().unwrap();
let listener = TcpListener::bind(server_addr).await.unwrap(); let listener = TcpListener::bind(server_addr).await.unwrap();
loop {
let (mut stream, _) = listener.accept().await.unwrap(); let (mut stream, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 2];
let mut n = stream.read(&mut buf).await.unwrap();
while n > 0 {
stream.write(b"hello").await.unwrap(); stream.write(b"hello").await.unwrap();
if buf.eq(b"by") {
stream.shutdown().await.unwrap(); stream.shutdown().await.unwrap();
break;
}
n = stream.read(&mut buf).await.unwrap();
}
stream.shutdown().await.unwrap();
}
} }
#[tokio::test] #[tokio::test]
async fn test_tcp_proxy() { async fn test_proxy() {
use crate::config::Config; use crate::config::Config;
let config = Config::new("tests/config.yaml").unwrap(); let config = Config::new("tests/config.yaml").unwrap();
let mut server = Server::new(config.base); let mut server = Server::new(config.base);
@@ -135,44 +164,21 @@ mod test {
let _ = server.run(); let _ = server.run();
}); });
sleep(Duration::from_secs(1)); // wait for server to start sleep(Duration::from_secs(1)); // wait for server to start
let mut conn = TcpStream::connect("127.0.0.1:54500").await.unwrap();
// test TCP proxy
let mut conn = tokio::net::TcpStream::connect("127.0.0.1:54500")
.await
.unwrap();
let mut buf = [0u8; 5]; let mut buf = [0u8; 5];
conn.write(b"hi").await.unwrap();
conn.read(&mut buf).await.unwrap(); conn.read(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello"); assert_eq!(&buf, b"hello");
conn.shutdown().await.unwrap(); conn.shutdown().await.unwrap();
}
#[tokio::test] // test TCP echo
async fn test_tcp_echo_server() { let mut conn = tokio::net::TcpStream::connect("127.0.0.1:54956")
use crate::config::Config; .await
let config = Config::new("tests/config.yaml").unwrap(); .unwrap();
let mut server = Server::new(config.base);
thread::spawn(move || {
let _ = server.run();
});
sleep(Duration::from_secs(1)); // wait for server to start
let mut conn = TcpStream::connect("127.0.0.1:54956").await.unwrap();
let mut buf = [0u8; 1];
for i in 0..=255u8 {
conn.write(&[i]).await.unwrap();
conn.read(&mut buf).await.unwrap();
assert_eq!(&buf, &[i]);
}
conn.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_kcp_echo_server() {
use crate::config::Config;
let config = Config::new("tests/config.yaml").unwrap();
let mut server = Server::new(config.base);
thread::spawn(move || {
let _ = server.run();
});
sleep(Duration::from_secs(1)); // wait for server to start
let kcp_config = KcpConfig::default();
let server_addr: SocketAddr = "127.0.0.1:54959".parse().unwrap();
let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
let mut buf = [0u8; 1]; let mut buf = [0u8; 1];
for i in 0..=10u8 { for i in 0..=10u8 {
conn.write(&[i]).await.unwrap(); conn.write(&[i]).await.unwrap();
@@ -180,5 +186,27 @@ mod test {
assert_eq!(&buf, &[i]); assert_eq!(&buf, &[i]);
} }
conn.shutdown().await.unwrap(); conn.shutdown().await.unwrap();
// test KCP echo
// let kcp_config = KcpConfig::default();
// let server_addr: SocketAddr = "127.0.0.1:54959".parse().unwrap();
// let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
// let mut buf = [0u8; 1];
// for i in 0..=10u8 {
// conn.write(&[i]).await.unwrap();
// conn.read(&mut buf).await.unwrap();
// assert_eq!(&buf, &[i]);
// }
// conn.shutdown().await.unwrap();
//
// // test KCP proxy and close mock server
// let kcp_config = KcpConfig::default();
// let server_addr: SocketAddr = "127.0.0.1:54958".parse().unwrap();
// let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
// let mut buf = [0u8; 5];
// conn.write(b"by").await.unwrap();
// conn.read(&mut buf).await.unwrap();
// assert_eq!(&buf, b"hello");
// conn.shutdown().await.unwrap();
} }
} }

View File

@@ -1,3 +1,4 @@
use crate::config::Upstream;
use crate::plugins::kcp::{KcpConfig, KcpListener, KcpStream}; use crate::plugins::kcp::{KcpConfig, KcpListener, KcpStream};
use crate::servers::Proxy; use crate::servers::Proxy;
use futures::future::try_join; use futures::future::try_join;
@@ -52,25 +53,30 @@ async fn accept(
"No upstream named {:?} on server {:?}", "No upstream named {:?} on server {:?}",
proxy.default, proxy.name proxy.default, proxy.name
); );
return process(inbound, &proxy.default).await; return process(inbound, proxy.upstream.get(&proxy.default).unwrap()).await;
// ToDo: Remove unwrap and check default option
} }
}; };
return process(inbound, upstream).await; return process(inbound, upstream).await;
} }
async fn process(mut inbound: KcpStream, upstream: &str) -> Result<(), Box<dyn std::error::Error>> { async fn process(
if upstream == "ban" { mut inbound: KcpStream,
upstream: &Upstream,
) -> Result<(), Box<dyn std::error::Error>> {
match upstream {
Upstream::Ban => {
let _ = inbound.shutdown(); let _ = inbound.shutdown();
return Ok(()); }
} else if upstream == "echo" { Upstream::Echo => {
let (mut ri, mut wi) = io::split(inbound); let (mut ri, mut wi) = io::split(inbound);
let inbound_to_inbound = copy(&mut ri, &mut wi); let inbound_to_inbound = copy(&mut ri, &mut wi);
let bytes_tx = inbound_to_inbound.await; let bytes_tx = inbound_to_inbound.await;
debug!("Bytes read: {:?}", bytes_tx); debug!("Bytes read: {:?}", bytes_tx);
return Ok(());
} }
Upstream::Custom(custom) => match custom.protocol.as_ref() {
let outbound = TcpStream::connect(upstream).await?; "tcp" => {
let outbound = TcpStream::connect(custom.addr.clone()).await?;
let (mut ri, mut wi) = io::split(inbound); let (mut ri, mut wi) = io::split(inbound);
let (mut ro, mut wo) = io::split(outbound); let (mut ro, mut wo) = io::split(outbound);
@@ -78,10 +84,16 @@ async fn process(mut inbound: KcpStream, upstream: &str) -> Result<(), Box<dyn s
let inbound_to_outbound = copy(&mut ri, &mut wo); let inbound_to_outbound = copy(&mut ri, &mut wo);
let outbound_to_inbound = copy(&mut ro, &mut wi); let outbound_to_inbound = copy(&mut ro, &mut wi);
let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; let (bytes_tx, bytes_rx) =
try_join(inbound_to_outbound, outbound_to_inbound).await?;
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
}
_ => {
error!("Reached unknown protocol: {:?}", custom.protocol);
}
},
};
Ok(()) Ok(())
} }

View File

@@ -1,3 +1,3 @@
pub mod kcp; //pub mod kcp;
pub mod tcp; pub mod tcp;
pub mod tls; pub mod tls;

View File

@@ -1,7 +1,8 @@
use crate::config::Upstream;
use crate::servers::protocol::tls::get_sni; use crate::servers::protocol::tls::get_sni;
use crate::servers::Proxy; use crate::servers::Proxy;
use futures::future::try_join; use futures::future::try_join;
use log::{debug, error, warn}; use log::{debug, error, info, warn};
use std::sync::Arc; use std::sync::Arc;
use tokio::io; use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
@@ -33,7 +34,7 @@ pub async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>>
} }
async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> { async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
debug!("New connection from {:?}", inbound.peer_addr()?); info!("New connection from {:?}", inbound.peer_addr()?);
let upstream_name = match proxy.tls { let upstream_name = match proxy.tls {
false => proxy.default.clone(), false => proxy.default.clone(),
@@ -71,25 +72,46 @@ async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std
"No upstream named {:?} on server {:?}", "No upstream named {:?} on server {:?}",
proxy.default, proxy.name proxy.default, proxy.name
); );
return process(inbound, &proxy.default).await; return process(inbound, proxy.upstream.get(&proxy.default).unwrap().clone()).await;
// ToDo: Remove unwrap and check default option
} }
}; };
return process(inbound, upstream).await;
match upstream {
Upstream::Custom(u) => u.resolve_addresses().await?,
_ => {}
} }
async fn process(mut inbound: TcpStream, upstream: &str) -> Result<(), Box<dyn std::error::Error>> { return process(inbound, upstream.clone()).await;
if upstream == "ban" { }
async fn process(
mut inbound: TcpStream,
upstream: Upstream,
) -> Result<(), Box<dyn std::error::Error>> {
match upstream {
Upstream::Ban => {
let _ = inbound.shutdown(); let _ = inbound.shutdown();
return Ok(()); }
} else if upstream == "echo" { Upstream::Echo => {
let (mut ri, mut wi) = io::split(inbound); let (mut ri, mut wi) = io::split(inbound);
let inbound_to_inbound = copy(&mut ri, &mut wi); let inbound_to_inbound = copy(&mut ri, &mut wi);
let bytes_tx = inbound_to_inbound.await; let bytes_tx = inbound_to_inbound.await;
debug!("Bytes read: {:?}", bytes_tx); debug!("Bytes read: {:?}", bytes_tx);
return Ok(());
} }
Upstream::Custom(custom) => {
custom.resolve_addresses().await?;
let outbound = match custom.protocol.as_ref() {
"tcp4" | "tcp6" | "tcp" => {
TcpStream::connect(custom.get_addresses().await.as_slice()).await?
}
_ => {
error!("Reached unknown protocol: {:?}", custom.protocol);
return Err("Reached unknown protocol".into());
}
};
let outbound = TcpStream::connect(upstream).await?; debug!("Connected to {:?}", outbound.peer_addr().unwrap());
let (mut ri, mut wi) = io::split(inbound); let (mut ri, mut wi) = io::split(inbound);
let (mut ro, mut wo) = io::split(outbound); let (mut ro, mut wo) = io::split(outbound);
@@ -100,7 +122,8 @@ async fn process(mut inbound: TcpStream, upstream: &str) -> Result<(), Box<dyn s
let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?; let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?;
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
}
};
Ok(()) Ok(())
} }

View File

@@ -49,6 +49,7 @@ pub fn get_sni(buf: &[u8]) -> Vec<String> {
} }
} }
debug!("Found SNIs: {:?}", &snis);
snis snis
} }

View File

@@ -11,21 +11,26 @@ servers:
proxy.test.com: proxy proxy.test.com: proxy
www.test.com: web www.test.com: web
default: ban default: ban
echo_server:
listen:
- "0.0.0.0:54956"
default: echo
tcp_server: tcp_server:
listen: listen:
- "127.0.0.1:54500" - "127.0.0.1:54500"
default: tester default: tester
tcp_echo_server:
listen:
- "0.0.0.0:54956"
default: echo
kcp_server: kcp_server:
protocol: kcp
listen:
- "127.0.0.1:54958"
default: tester
kcp_echo_server:
protocol: kcp protocol: kcp
listen: listen:
- "127.0.0.1:54959" - "127.0.0.1:54959"
default: echo default: echo
upstream: upstream:
web: "127.0.0.1:8080" web: "tcp://127.0.0.1:8080"
proxy: "www.example.com:1024" proxy: "tcp://www.example.com:1024"
tester: "127.0.0.1:54599" tester: "tcp://127.0.0.1:54599"