19 Commits

Author SHA1 Message Date
086e2b4766 Tag 0.1.7
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Critical bug fixes

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 23:11:29 +02:00
5f0de72b88 Remove unused variable
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:56:46 +02:00
40b890bc13 Add much better debug logging of address resolution
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:54:41 +02:00
483c058105 Slightly better way of finding the config file
It now also looks in the current working directory.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:53:46 +02:00
6349fc6502 Prevent unnecessary clone
This also ensures that the address resolver actually keeps state.
Otherwise it was cloned before each resolution, resulting in it never
keeping the resolved addresses.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:52:46 +02:00
cd35859c9b Initialize UpstreamAddress with actual address
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:51:25 +02:00
7f399af713 Update rust and zig
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-24 18:45:04 +02:00
fd86162450 Version 0.1.6
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 20:52:46 +02:00
a6748f30d9 Make English readme the default
Some checks reported errors
continuous-integration/drone/pr Build was killed
continuous-integration/drone/push Build is passing
Since I'm unable to read Chinese

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 20:45:42 +02:00
902b2c0d55 Update build file
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 20:33:18 +02:00
fb7a7d9cae Update gitignore
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 20:33:07 +02:00
1c325f45b4 Add sample configuration file
Some checks failed
continuous-integration/drone Build is failing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 19:29:49 +02:00
79c931fc38 Add build instructions
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 19:29:29 +02:00
915e39b684 Extract DNS address resolution
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-16 09:32:05 +02:00
0c5153bbd6 Rename Proxy::default to ::default_action
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-16 09:31:20 +02:00
01784ee3fd Update dependencies 2023-08-16 09:29:18 +02:00
f4bc441ca8 Enable explicit ipv4 / ipv6 proxying
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-06-02 17:35:29 +02:00
f010f8c76b Update dependencies
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-06-02 17:35:13 +02:00
8fbc0c370a Add error messages when failed to start server 2021-12-30 22:05:25 +08:00
19 changed files with 902 additions and 523 deletions

3
.cargo/config.toml Normal file
View File

@ -0,0 +1,3 @@
[profile.release]
lto = "thin"
strip = true

92
.drone.jsonnet Normal file
View File

@ -0,0 +1,92 @@
local executableName = 'fourth';
local build_image = 'img.kie.rs/jjkiers/rust-cross:rust1.71.1-zig';
local archs = [
{ target: 'aarch64-unknown-linux-musl', short: 'arm64-musl' },
{ target: 'x86_64-pc-windows-gnu', short: 'windows' },
{ target: 'x86_64-unknown-linux-musl', short: 'amd64-musl' },
];
local getStepName(arch) = 'Build for ' + arch.short;
local builtExecutableName(arch) = executableName + if std.length(std.findSubstr(arch.short, 'windows')) > 0 then '.exe' else '';
local targetExecutableName(arch) = executableName + '-' + arch.target + if std.length(std.findSubstr(arch.short, 'windows')) > 0 then '.exe' else '';
local getVolumeName(arch) = 'target-' + arch.target;
local getLocalVolumes(arch) = [
{
name: getVolumeName(arch),
temp: {},
}
for arch in archs
];
local add_build_steps() = [
{
name: getStepName(arch),
image: build_image,
commands: [
'echo Hello World from Jsonnet on ' + arch.target + '!',
'cargo zigbuild --release --target ' + arch.target,
'cp target/' + arch.target + '/release/' + builtExecutableName(arch) + ' artifacts/' + targetExecutableName(arch),
'rm -rf target/' + arch.target + '/release/*',
],
depends_on: ['Prepare'],
volumes: [{
name: getVolumeName(arch),
path: '/drone/src/target',
}],
}
for arch in archs
];
{
kind: 'pipeline',
type: 'docker',
name: 'default',
platform: {
arch: 'amd64',
},
steps:
[{
name: 'Prepare',
image: build_image,
commands: [
'mkdir artifacts',
'echo Using image: ' + build_image,
'cargo --version',
'rustc --version',
],
}] +
add_build_steps() +
[
{
name: 'Show built artifacts',
image: build_image,
commands: [
'ls -lah artifacts',
],
depends_on: [getStepName(a) for a in archs],
},
{
name: 'Create release on gitea',
image: 'plugins/gitea-release',
settings: {
api_key: {
from_secret: 'gitea_token',
},
base_url: 'https://code.kiers.eu',
files: 'artifacts/*',
checksum: 'sha256',
},
when: {
event: ['tag', 'promote'],
},
depends_on: ['Show built artifacts'],
},
],
volumes: getLocalVolumes(archs),
image_pull_secrets: ['docker_private_repo'],
}

View File

@ -1,39 +0,0 @@
on:
release:
types: [published]
name: Publish binaries to release
jobs:
publish:
name: Publish for ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
include:
- os: ubuntu-latest
artifact_name: fourth
asset_name: fourth-linux-amd64
- os: macos-latest
artifact_name: fourth
asset_name: fourth-macos-amd64
- os: windows-latest
artifact_name: fourth.exe
asset_name: fourth-windows-amd64.exe
steps:
- uses: hecrj/setup-rust-action@master
with:
rust-version: stable
- uses: actions/checkout@v2
- name: Build
run: cargo build --release --locked
- name: Publish
uses: svenstaro/upload-release-action@v1-release
with:
repo_token: ${{ secrets.PUBLISH_TOKEN }}
file: target/release/${{ matrix.artifact_name }}
asset_name: ${{ matrix.asset_name }}
tag: ${{ github.ref }}

View File

@ -1,24 +0,0 @@
name: Rust
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Upgrade Rust
run: rustup update
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

1
.gitignore vendored
View File

@ -1 +1,2 @@
/target /target
config.yaml

665
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "fourth" name = "fourth"
version = "0.1.4" version = "0.1.7"
edition = "2021" edition = "2021"
authors = ["LI Rui <lr_cn@outlook.com>"] authors = ["LI Rui <lr_cn@outlook.com>"]
license = "Apache-2.0" license = "Apache-2.0"
@ -19,13 +19,12 @@ exclude = [".*"]
log = "0.4" log = "0.4"
pretty_env_logger = "0.4" pretty_env_logger = "0.4"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8" serde_yaml = "0.9.21"
futures = "0.3" futures = "0.3"
tls-parser = "0.11" tls-parser = "0.11"
url = "2.2.2" url = "2.2.2"
time = { version = "0.3.1", features = ["local-offset", "formatting"] }
tokio = { version = "1.0", features = ["full"] } tokio = { version = "1.0", features = ["full"] }
bytes = "1.1" bytes = "1.1"
kcp = "0.4"
byte_string = "1" byte_string = "1"

View File

@ -1,70 +0,0 @@
# Fourth
> Hey, now we are on level 4!
[![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml)
**Under heavy development, version 0.1 may update frequently**
Fourth is a layer 4 proxy implemented by Rust to listen on specific ports and transfer TCP/KCP data to remote addresses(only TCP) according to configuration.
## Features
- Listen on specific port and proxy to local or remote port
- SNI-based rule without terminating TLS connection
- Allow KCP inbound(warning: untested)
## Installation
To gain best performance on your computer's architecture, please consider build the source code. First, you may need [Rust tool chain](https://rustup.rs/).
```bash
$ cd fourth
$ cargo build --release
```
Binary file will be generated at `target/release/fourth`, or you can use `cargo install --path .` to install.
Or you can use Cargo to install Fourth:
```bash
$ cargo install fourth
```
Or you can download binary file form the Release page.
## Configuration
Fourth will read yaml format configuration file from `/etc/fourth/config.yaml`, and you can set custom path to environment variable `FOURTH_CONFIG`, here is an minimal viable example:
```yaml
version: 1
log: info
servers:
proxy_server:
listen:
- "127.0.0.1:8081"
default: remote
upstream:
remote: "tcp://www.remote.example.com:8082" # proxy to remote address
```
Built-in two upstreams: ban(terminate connection immediately), echo. For detailed configuration, check [this example](./example-config.yaml).
## Performance Benchmark
Tested on 4C2G server:
Use fourth to proxy to Nginx(QPS of direct connection: ~120000): ~70000 req/s (Command: `wrk -t200 -c1000 -d120s --latency http://proxy-server:8081`)
Use fourth to proxy to local iperf3: 8Gbps
## Thanks
- [tokio_kcp](https://github.com/Matrix-Zhang/tokio_kcp)
## License
Fourth is available under terms of Apache-2.0.

80
README-ZH.md Normal file
View File

@ -0,0 +1,80 @@
# Fourth
> 这一波在第四层。
[![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml)
[English](/README-EN.md)
**积极开发中0.1版本迭代可能较快**
Fourth是一个Rust实现的Layer 4代理用于监听指定端口TCP/KCP流量并根据规则转发到指定目标目前只支持TCP
## 功能
- 监听指定端口代理到本地或远端指定端口
- 监听指定端口通过TLS ClientHello消息中的SNI进行分流
- 支持KCP入站警告未测试
## 安装方法
为了确保获得您架构下的最佳性能,请考虑自行编译,首选需要确保您拥有[Rust工具链](https://rustup.rs/)。
```bash
$ cd fourth
$ cargo build --release
```
将在`target/release/fourth`生成二进制文件,您也可以使用`cargo install --path . `来安装二进制文件。
或者您也可以使用Cargo直接安装
```bash
$ cargo install fourth
```
或者您也可以直接从Release中下载二进制文件。
## 配置
Fourth使用yaml格式的配置文件默认情况下会读取`/etc/fourth/config.yaml`,您也可以设置自定义路径到环境变量`FOURTH_CONFIG`,如下是一个最小有效配置:
```yaml
version: 1
log: info
servers:
proxy_server:
listen:
- "127.0.0.1:8081"
default: remote
upstream:
remote: "tcp://www.remote.example.com:8082" # proxy to remote address
```
内置两个的upstreamban立即中断连接、echo返回读到的数据。更详细的配置可以参考[示例配置](./example-config.yaml)。
注意:[::]会默认同时绑定IPv4和IPv6。
## 性能测试
在4C2G的服务器上测试
使用Fourth代理到Nginx直连QPS 120000: ~70000req/s (测试命令:`wrk -t200 -c1000 -d120s --latency http://proxy-server:8081 `
使用Fourth代理到本地iperf38Gbps
## io_uring?
尽管经过了很多尝试我们发现目前一些Rust下面的io_uring实现存在问题我们使用的io_uring库实现尽管在吞吐量上可以做到单线程20Gbps相比之下Tokio仅有8Gbps但在QPS上存在性能损失较大的问题。因此在有成熟的io_uring实现之前我们仍然选择epoll。之后我们会持续关注相关进展。
可能以后会为Linux高内核版本的用户提供可选的io_uring加速。
## 感谢
- [tokio_kcp](https://github.com/Matrix-Zhang/tokio_kcp)
## 协议
Fourth以Apache-2.0协议开源。

View File

@ -1,43 +1,41 @@
# Fourth # Fourth
> 这一波在第四层。 > Hey, now we are on level 4!
[![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml) [![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml)
[English](/README-EN.md) **Under heavy development, version 0.1 may update frequently**
**积极开发中0.1版本迭代可能较快** Fourth is a layer 4 proxy implemented by Rust to listen on specific ports and transfer TCP/KCP data to remote addresses(only TCP) according to configuration.
Fourth是一个Rust实现的Layer 4代理用于监听指定端口TCP/KCP流量并根据规则转发到指定目标目前只支持TCP ## Features
## 功能 - Listen on specific port and proxy to local or remote port
- SNI-based rule without terminating TLS connection
- Allow KCP inbound(warning: untested)
- 监听指定端口代理到本地或远端指定端口 ## Installation
- 监听指定端口通过TLS ClientHello消息中的SNI进行分流
- 支持KCP入站警告未测试
## 安装方法 To gain best performance on your computer's architecture, please consider build the source code. First, you may need [Rust tool chain](https://rustup.rs/).
为了确保获得您架构下的最佳性能,请考虑自行编译,首选需要确保您拥有[Rust工具链](https://rustup.rs/)。
```bash ```bash
$ cd fourth $ cd fourth
$ cargo build --release $ cargo build --release
``` ```
将在`target/release/fourth`生成二进制文件,您也可以使用`cargo install --path . `来安装二进制文件。 Binary file will be generated at `target/release/fourth`, or you can use `cargo install --path .` to install.
或者您也可以使用Cargo直接安装 Or you can use Cargo to install Fourth:
```bash ```bash
$ cargo install fourth $ cargo install fourth
``` ```
或者您也可以直接从Release中下载二进制文件。 Or you can download binary file form the Release page.
## 配置 ## Configuration
Fourth使用yaml格式的配置文件默认情况下会读取`/etc/fourth/config.yaml`,您也可以设置自定义路径到环境变量`FOURTH_CONFIG`,如下是一个最小有效配置: Fourth will read yaml format configuration file from `/etc/fourth/config.yaml`, and you can set custom path to environment variable `FOURTH_CONFIG`, here is an minimal viable example:
```yaml ```yaml
version: 1 version: 1
@ -53,26 +51,20 @@ upstream:
remote: "tcp://www.remote.example.com:8082" # proxy to remote address remote: "tcp://www.remote.example.com:8082" # proxy to remote address
``` ```
内置两个的upstreamban立即中断连接、echo返回读到的数据。更详细的配置可以参考[示例配置](./example-config.yaml) Built-in two upstreams: ban(terminate connection immediately), echo. For detailed configuration, check [this example](./example-config.yaml).
## 性能测试 ## Performance Benchmark
在4C2G的服务器上测试 Tested on 4C2G server:
使用Fourth代理到Nginx(直连QPS 120000: ~70000req/s (测试命令:`wrk -t200 -c1000 -d120s --latency http://proxy-server:8081 ` Use fourth to proxy to Nginx(QPS of direct connection: ~120000): ~70000 req/s (Command: `wrk -t200 -c1000 -d120s --latency http://proxy-server:8081`)
使用Fourth代理到本地iperf38Gbps Use fourth to proxy to local iperf3: 8Gbps
## io_uring? ## Thanks
尽管经过了很多尝试我们发现目前一些Rust下面的io_uring实现存在问题我们使用的io_uring库实现尽管在吞吐量上可以做到单线程20Gbps相比之下Tokio仅有8Gbps但在QPS上存在性能损失较大的问题。因此在有成熟的io_uring实现之前我们仍然选择epoll。之后我们会持续关注相关进展。
可能以后会为Linux高内核版本的用户提供可选的io_uring加速。
## 感谢
- [tokio_kcp](https://github.com/Matrix-Zhang/tokio_kcp) - [tokio_kcp](https://github.com/Matrix-Zhang/tokio_kcp)
## 协议 ## License
FourthApache-2.0协议开源。 Fourth is available under terms of Apache-2.0.

16
config.yaml.example Normal file
View File

@ -0,0 +1,16 @@
version: 1
log: debug
servers:
example_server:
listen:
- "0.0.0.0:8443"
tls: true # Enable TLS features like SNI filtering
sni:
api.example.org: example-api
www.example.org: gh-proxy
default: ban
upstream:
proxy: "tcp://new-www.example.org:443" # Connect over IPv4 or IPv6 to new-www.example.org:443
example-api: "tcp6://api-v1.example.com:443" # Connect over IPv6 to api-v1.example.com:443

View File

@ -1,8 +1,11 @@
use crate::servers::upstream_address::UpstreamAddress;
use log::{debug, warn}; use log::{debug, warn};
use serde::Deserialize; use serde::Deserialize;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fs::File; use std::fs::File;
use std::io::{Error as IOError, Read}; use std::io::{Error as IOError, Read};
use std::net::SocketAddr;
use tokio::sync::Mutex;
use url::Url; use url::Url;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -42,11 +45,46 @@ pub enum Upstream {
Custom(CustomUpstream), Custom(CustomUpstream),
} }
#[derive(Debug)]
struct Addr(Mutex<UpstreamAddress>);
impl Default for Addr {
fn default() -> Self {
Self(Default::default())
}
}
impl Clone for Addr {
fn clone(&self) -> Self {
tokio::task::block_in_place(|| Self(Mutex::new(self.0.blocking_lock().clone())))
}
}
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct CustomUpstream { pub struct CustomUpstream {
pub name: String, pub name: String,
pub addr: String, pub addr: String,
pub protocol: String, pub protocol: String,
#[serde(skip_deserializing)]
addresses: Addr,
}
impl CustomUpstream {
pub async fn resolve_addresses(&self) -> std::io::Result<Vec<SocketAddr>> {
let mut addr = self.addresses.0.lock().await;
addr.resolve((*self.protocol).into()).await
}
}
impl Default for CustomUpstream {
fn default() -> Self {
Self {
name: Default::default(),
addr: Default::default(),
protocol: Default::default(),
addresses: Default::default(),
}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -119,11 +157,14 @@ fn load_config(path: &str) -> Result<ParsedConfig, ConfigError> {
} }
}; };
if upstream_url.scheme() != "tcp" { match upstream_url.scheme() {
"tcp" | "tcp4" | "tcp6" => {}
_ => {
return Err(ConfigError::Custom(format!( return Err(ConfigError::Custom(format!(
"Invalid upstream scheme {}", "Invalid upstream scheme {}",
upstream upstream
))); )))
}
} }
parsed_upstream.insert( parsed_upstream.insert(
@ -132,6 +173,11 @@ fn load_config(path: &str) -> Result<ParsedConfig, ConfigError> {
name: name.to_string(), name: name.to_string(),
addr: format!("{}:{}", upstream_host, upsteam_port), addr: format!("{}:{}", upstream_host, upsteam_port),
protocol: upstream_url.scheme().to_string(), protocol: upstream_url.scheme().to_string(),
addresses: Addr(Mutex::new(UpstreamAddress::new(format!(
"{}:{}",
upstream_host, upsteam_port
)))),
..Default::default()
}), }),
); );
} }
@ -198,7 +244,7 @@ fn verify_config(config: ParsedConfig) -> Result<ParsedConfig, ConfigError> {
} }
for key in &upstream_names { for key in &upstream_names {
if !used_upstreams.contains(key) { if !used_upstreams.contains(key) && !key.eq("echo") && !key.eq("ban") {
warn!("Upstream {} not used", key); warn!("Upstream {} not used", key);
} }
} }

View File

@ -5,11 +5,12 @@ mod servers;
use crate::config::Config; use crate::config::Config;
use crate::servers::Server; use crate::servers::Server;
use std::env;
use log::{debug, error}; use log::{debug, error};
use std::env;
use std::path::Path;
fn main() { fn main() {
let config_path = env::var("FOURTH_CONFIG").unwrap_or_else(|_| "/etc/fourth/config.yaml".to_string()); let config_path = find_config();
let config = match Config::new(&config_path) { let config = match Config::new(&config_path) {
Ok(config) => config, Ok(config) => config,
@ -23,6 +24,21 @@ fn main() {
let mut server = Server::new(config.base); let mut server = Server::new(config.base);
debug!("{:?}", server); debug!("{:?}", server);
let res = server.run(); let _ = server.run();
error!("Server returned an error: {:?}", res); error!("Server ended with errors");
}
fn find_config() -> String {
let config_path =
env::var("FOURTH_CONFIG").unwrap_or_else(|_| "/etc/fourth/config.yaml".to_string());
if Path::new(&config_path).exists() {
return config_path;
}
if Path::new("config.yaml").exists() {
return String::from("config.yaml");
}
String::from("")
} }

View File

@ -1 +1 @@
pub mod kcp; //pub mod kcp;

View File

@ -5,23 +5,24 @@ use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
mod protocol; mod protocol;
pub(crate) mod upstream_address;
use crate::config::{ParsedConfig, Upstream}; use crate::config::{ParsedConfig, Upstream};
use protocol::{kcp, tcp}; use protocol::tcp;
#[derive(Debug)] #[derive(Debug)]
pub struct Server { pub(crate) struct Server {
pub proxies: Vec<Arc<Proxy>>, pub proxies: Vec<Arc<Proxy>>,
pub config: ParsedConfig,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Proxy { pub(crate) struct Proxy {
pub name: String, pub name: String,
pub listen: SocketAddr, pub listen: SocketAddr,
pub protocol: String, pub protocol: String,
pub tls: bool, pub tls: bool,
pub sni: Option<HashMap<String, String>>, pub sni: Option<HashMap<String, String>>,
pub default: String, pub default_action: String,
pub upstream: HashMap<String, Upstream>, pub upstream: HashMap<String, Upstream>,
} }
@ -29,7 +30,6 @@ impl Server {
pub fn new(config: ParsedConfig) -> Self { pub fn new(config: ParsedConfig) -> Self {
let mut new_server = Server { let mut new_server = Server {
proxies: Vec::new(), proxies: Vec::new(),
config: config.clone(),
}; };
for (name, proxy) in config.servers.iter() { for (name, proxy) in config.servers.iter() {
@ -60,7 +60,7 @@ impl Server {
protocol: protocol.clone(), protocol: protocol.clone(),
tls, tls,
sni: sni.clone(), sni: sni.clone(),
default: default.clone(), default_action: default.clone(),
upstream: upstream.clone(), upstream: upstream.clone(),
}; };
new_server.proxies.push(Arc::new(proxy)); new_server.proxies.push(Arc::new(proxy));
@ -83,11 +83,29 @@ impl Server {
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
match config.protocol.as_ref() { match config.protocol.as_ref() {
"tcp" => { "tcp" => {
let _ = tcp::proxy(config).await; let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
} }
"kcp" => {
let _ = kcp::proxy(config).await;
} }
"tcp4" => {
let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
}
}
"tcp6" => {
let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
}
}
// "kcp" => {
// let res = kcp::proxy(config.clone()).await;
// if res.is_err() {
// error!("Failed to start {}: {}", config.name, res.err().unwrap());
// }
// }
_ => { _ => {
error!("Invalid protocol: {}", config.protocol) error!("Invalid protocol: {}", config.protocol)
} }
@ -105,12 +123,11 @@ impl Server {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::plugins::kcp::{KcpConfig, KcpStream}; //use crate::plugins::kcp::{KcpConfig, KcpStream};
use std::net::SocketAddr;
use std::thread::{self, sleep}; use std::thread::{self, sleep};
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::TcpListener;
use super::*; use super::*;
@ -149,7 +166,9 @@ mod tests {
sleep(Duration::from_secs(1)); // wait for server to start sleep(Duration::from_secs(1)); // wait for server to start
// test TCP proxy // test TCP proxy
let mut conn = TcpStream::connect("127.0.0.1:54500").await.unwrap(); let mut conn = tokio::net::TcpStream::connect("127.0.0.1:54500")
.await
.unwrap();
let mut buf = [0u8; 5]; let mut buf = [0u8; 5];
conn.write(b"hi").await.unwrap(); conn.write(b"hi").await.unwrap();
conn.read(&mut buf).await.unwrap(); conn.read(&mut buf).await.unwrap();
@ -157,7 +176,9 @@ mod tests {
conn.shutdown().await.unwrap(); conn.shutdown().await.unwrap();
// test TCP echo // test TCP echo
let mut conn = TcpStream::connect("127.0.0.1:54956").await.unwrap(); let mut conn = tokio::net::TcpStream::connect("127.0.0.1:54956")
.await
.unwrap();
let mut buf = [0u8; 1]; let mut buf = [0u8; 1];
for i in 0..=10u8 { for i in 0..=10u8 {
conn.write(&[i]).await.unwrap(); conn.write(&[i]).await.unwrap();
@ -167,25 +188,25 @@ mod tests {
conn.shutdown().await.unwrap(); conn.shutdown().await.unwrap();
// test KCP echo // test KCP echo
let kcp_config = KcpConfig::default(); // let kcp_config = KcpConfig::default();
let server_addr: SocketAddr = "127.0.0.1:54959".parse().unwrap(); // let server_addr: SocketAddr = "127.0.0.1:54959".parse().unwrap();
let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap(); // let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
let mut buf = [0u8; 1]; // let mut buf = [0u8; 1];
for i in 0..=10u8 { // for i in 0..=10u8 {
conn.write(&[i]).await.unwrap(); // conn.write(&[i]).await.unwrap();
conn.read(&mut buf).await.unwrap(); // conn.read(&mut buf).await.unwrap();
assert_eq!(&buf, &[i]); // assert_eq!(&buf, &[i]);
} // }
conn.shutdown().await.unwrap(); // conn.shutdown().await.unwrap();
//
// test KCP proxy and close mock server // // test KCP proxy and close mock server
let kcp_config = KcpConfig::default(); // let kcp_config = KcpConfig::default();
let server_addr: SocketAddr = "127.0.0.1:54958".parse().unwrap(); // let server_addr: SocketAddr = "127.0.0.1:54958".parse().unwrap();
let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap(); // let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
let mut buf = [0u8; 5]; // let mut buf = [0u8; 5];
conn.write(b"by").await.unwrap(); // conn.write(b"by").await.unwrap();
conn.read(&mut buf).await.unwrap(); // conn.read(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello"); // assert_eq!(&buf, b"hello");
conn.shutdown().await.unwrap(); // conn.shutdown().await.unwrap();
} }
} }

View File

@ -1,3 +1,3 @@
pub mod kcp; //pub mod kcp;
pub mod tcp; pub mod tcp;
pub mod tls; pub mod tls;

View File

@ -2,13 +2,13 @@ use crate::config::Upstream;
use crate::servers::protocol::tls::get_sni; use crate::servers::protocol::tls::get_sni;
use crate::servers::Proxy; use crate::servers::Proxy;
use futures::future::try_join; use futures::future::try_join;
use log::{debug, error, warn}; use log::{debug, error, info, warn};
use std::sync::Arc; use std::sync::Arc;
use tokio::io; use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
pub async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> { pub(crate) async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(config.listen).await?; let listener = TcpListener::bind(config.listen).await?;
let config = config.clone(); let config = config.clone();
@ -34,20 +34,20 @@ pub async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>>
} }
async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> { async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
debug!("New connection from {:?}", inbound.peer_addr()?); info!("New connection from {:?}", inbound.peer_addr()?);
let upstream_name = match proxy.tls { let upstream_name = match proxy.tls {
false => proxy.default.clone(), false => proxy.default_action.clone(),
true => { true => {
let mut hello_buf = [0u8; 1024]; let mut hello_buf = [0u8; 1024];
inbound.peek(&mut hello_buf).await?; inbound.peek(&mut hello_buf).await?;
let snis = get_sni(&hello_buf); let snis = get_sni(&hello_buf);
if snis.is_empty() { if snis.is_empty() {
proxy.default.clone() proxy.default_action.clone()
} else { } else {
match proxy.sni.clone() { match proxy.sni.clone() {
Some(sni_map) => { Some(sni_map) => {
let mut upstream = proxy.default.clone(); let mut upstream = proxy.default_action.clone();
for sni in snis { for sni in snis {
let m = sni_map.get(&sni); let m = sni_map.get(&sni);
if m.is_some() { if m.is_some() {
@ -57,7 +57,7 @@ async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std
} }
upstream upstream
} }
None => proxy.default.clone(), None => proxy.default_action.clone(),
} }
} }
} }
@ -70,13 +70,14 @@ async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std
None => { None => {
warn!( warn!(
"No upstream named {:?} on server {:?}", "No upstream named {:?} on server {:?}",
proxy.default, proxy.name proxy.default_action, proxy.name
); );
return process(inbound, proxy.upstream.get(&proxy.default).unwrap()).await; return process(inbound, proxy.upstream.get(&proxy.default_action).unwrap()).await;
// ToDo: Remove unwrap and check default option // ToDo: Remove unwrap and check default option
} }
}; };
return process(inbound, upstream).await;
return process(inbound, &upstream).await;
} }
async fn process( async fn process(
@ -93,9 +94,18 @@ async fn process(
let bytes_tx = inbound_to_inbound.await; let bytes_tx = inbound_to_inbound.await;
debug!("Bytes read: {:?}", bytes_tx); debug!("Bytes read: {:?}", bytes_tx);
} }
Upstream::Custom(custom) => match custom.protocol.as_ref() { Upstream::Custom(custom) => {
"tcp" => { let outbound = match custom.protocol.as_ref() {
let outbound = TcpStream::connect(custom.addr.clone()).await?; "tcp4" | "tcp6" | "tcp" => {
TcpStream::connect(custom.resolve_addresses().await?.as_slice()).await?
}
_ => {
error!("Reached unknown protocol: {:?}", custom.protocol);
return Err("Reached unknown protocol".into());
}
};
debug!("Connected to {:?}", outbound.peer_addr().unwrap());
let (mut ri, mut wi) = io::split(inbound); let (mut ri, mut wi) = io::split(inbound);
let (mut ro, mut wo) = io::split(outbound); let (mut ro, mut wo) = io::split(outbound);
@ -103,15 +113,10 @@ async fn process(
let inbound_to_outbound = copy(&mut ri, &mut wo); let inbound_to_outbound = copy(&mut ri, &mut wo);
let outbound_to_inbound = copy(&mut ro, &mut wi); let outbound_to_inbound = copy(&mut ro, &mut wi);
let (bytes_tx, bytes_rx) = let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?;
try_join(inbound_to_outbound, outbound_to_inbound).await?;
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx); debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
} }
_ => {
error!("Reached unknown protocol: {:?}", custom.protocol);
}
},
}; };
Ok(()) Ok(())
} }

View File

@ -49,6 +49,7 @@ pub fn get_sni(buf: &[u8]) -> Vec<String> {
} }
} }
debug!("Found SNIs: {:?}", &snis);
snis snis
} }

View File

@ -0,0 +1,141 @@
use log::debug;
use std::fmt::{Display, Formatter};
use std::io::Result;
use std::net::SocketAddr;
use time::{Duration, Instant, OffsetDateTime};
#[derive(Debug, Clone, Default)]
pub(crate) struct UpstreamAddress {
address: String,
resolved_addresses: Vec<SocketAddr>,
resolved_time: Option<Instant>,
ttl: Option<Duration>,
}
impl Display for UpstreamAddress {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.address.fmt(f)
}
}
impl UpstreamAddress {
pub fn new(address: String) -> Self {
UpstreamAddress {
address,
..Default::default()
}
}
pub fn is_valid(&self) -> bool {
if let Some(resolved) = self.resolved_time {
if let Some(ttl) = self.ttl {
return resolved.elapsed() < ttl;
}
}
false
}
fn is_resolved(&self) -> bool {
self.resolved_addresses.len() > 0
}
fn time_remaining(&self) -> Duration {
if !self.is_valid() {
return Duration::seconds(0);
}
self.ttl.unwrap() - self.resolved_time.unwrap().elapsed()
}
pub async fn resolve(&mut self, mode: ResolutionMode) -> Result<Vec<SocketAddr>> {
if self.is_resolved() && self.is_valid() {
debug!(
"Already got address {:?}, still valid for {:.3}s",
&self.resolved_addresses,
self.time_remaining().as_seconds_f64()
);
return Ok(self.resolved_addresses.clone());
}
debug!(
"Resolving addresses for {} with mode {:?}",
&self.address, &mode
);
let lookup_result = tokio::net::lookup_host(&self.address).await;
let resolved_addresses: Vec<SocketAddr> = match lookup_result {
Ok(resolved_addresses) => resolved_addresses.into_iter().collect(),
Err(e) => {
debug!("Failed looking up {}: {}", &self.address, &e);
// Protect against DNS flooding. Cache the result for 1 second.
self.resolved_time = Some(Instant::now());
self.ttl = Some(Duration::seconds(3));
return Err(e);
}
};
debug!("Resolved addresses: {:?}", &resolved_addresses);
let addresses: Vec<SocketAddr> = match mode {
ResolutionMode::Ipv4 => resolved_addresses
.into_iter()
.filter(|a| a.is_ipv4())
.collect(),
ResolutionMode::Ipv6 => resolved_addresses
.into_iter()
.filter(|a| a.is_ipv6())
.collect(),
_ => resolved_addresses,
};
debug!(
"Got {} addresses for {}: {:?}",
&mode, &self.address, &addresses
);
debug!(
"Resolved at {}",
OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.expect("Format")
);
self.resolved_addresses = addresses;
self.resolved_time = Some(Instant::now());
self.ttl = Some(Duration::minutes(1));
Ok(self.resolved_addresses.clone())
}
}
#[derive(Debug, Default, Clone)]
pub(crate) enum ResolutionMode {
#[default]
Ipv4AndIpv6,
Ipv4,
Ipv6,
}
impl From<&str> for ResolutionMode {
fn from(value: &str) -> Self {
match value {
"tcp4" => ResolutionMode::Ipv4,
"tcp6" => ResolutionMode::Ipv6,
"tcp" => ResolutionMode::Ipv4AndIpv6,
_ => panic!("This should never happen. Please check configuration parser."),
}
}
}
impl Display for ResolutionMode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ResolutionMode::Ipv4 => write!(f, "IPv4Only"),
ResolutionMode::Ipv6 => write!(f, "IPv6Only"),
ResolutionMode::Ipv4AndIpv6 => write!(f, "IPv4 and IPv6"),
}
}
}