55 Commits

Author SHA1 Message Date
ad6955a30d Fix crate name and release v0.1.8
All checks were successful
continuous-integration/drone Build is passing
continuous-integration/drone/tag Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-06-19 21:16:30 +02:00
4592c94586 Reintroduce L4P_CONFIG environment variable
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
This points to a user-configured configuration file.

Closes #5.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 23:53:47 +01:00
6284870059 Rename config::config to config::config_v1
To prevent module inception, which was a clippy warning.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 23:34:19 +01:00
97b4bf6bbe Solve synchronization issue
The async mutex in the previous variant would fail when used in a single
threaded mode, because block_in_place() cannot be used there.

Instead, replace the code with a Arc<RwLock> inside of the
UpstreamAddress to let that class take care of its own mutability.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 23:31:23 +01:00
59c7128f93 Remove kcp support
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 22:49:43 +01:00
9d9f89881d Improve config file handling
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 22:03:25 +01:00
ee67f7883e Rename to l4p, update references and README.md
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 22:03:25 +01:00
77bc8364f2 Update dependencies to latest versions
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-22 21:59:14 +01:00
ec9ab1d2bc Add example systemd unit with security protections
This is just about as secure as this process can get

Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-22 21:49:58 +01:00
bb81a32349 Deduplicate some code
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-05 13:34:40 +02:00
17b39dc6bc Prepare for new config version
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-05 13:34:40 +02:00
07fccb6b2a Clippy
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-05 00:26:19 +02:00
3a2367ef28 Moved upstreams to their own dedicated namespace
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-05 00:26:06 +02:00
2116659a14 Sort dependencies
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 23:34:26 +02:00
8404f38182 Move ProxyToUpstream parsing to TryFrom trait
This seems cleaner to me than parsing it externally.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 23:27:42 +02:00
23296c6436 Improve code style
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 23:27:42 +02:00
84f0499ec8 Remove unnecessary manual Default implementations
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:32:36 +02:00
ae594135a1 Update dependencies
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:31:43 +02:00
9564fbed6e Fix clippy warnings
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:14:51 +02:00
a574163aef Rename Upstream::Custom to Upstream::Proxy
And CustomUpstream to ProxyToUpstream.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:14:51 +02:00
2651ec1f4a Fix kcp module
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:14:51 +02:00
8dae1126d5 Deduplicate copy method
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:14:50 +02:00
da46c5873f Fix typo
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 20:48:01 +02:00
086e2b4766 Tag 0.1.7
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Critical bug fixes

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 23:11:29 +02:00
5f0de72b88 Remove unused variable
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:56:46 +02:00
40b890bc13 Add much better debug logging of address resolution
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:54:41 +02:00
483c058105 Slightly better way of finding the config file
It now also looks in the current working directory.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:53:46 +02:00
6349fc6502 Prevent unnecessary clone
This also ensures that the address resolver actually keeps state.
Otherwise it was cloned before each resolution, resulting in it never
keeping the resolved addresses.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:52:46 +02:00
cd35859c9b Initialize UpstreamAddress with actual address
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-25 22:51:25 +02:00
7f399af713 Update rust and zig
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-24 18:45:04 +02:00
fd86162450 Version 0.1.6
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 20:52:46 +02:00
a6748f30d9 Make English readme the default
Some checks reported errors
continuous-integration/drone/pr Build was killed
continuous-integration/drone/push Build is passing
Since I'm unable to read Chinese

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 20:45:42 +02:00
902b2c0d55 Update build file
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 20:33:18 +02:00
fb7a7d9cae Update gitignore
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 20:33:07 +02:00
1c325f45b4 Add sample configuration file
Some checks failed
continuous-integration/drone Build is failing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 19:29:49 +02:00
79c931fc38 Add build instructions
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-23 19:29:29 +02:00
915e39b684 Extract DNS address resolution
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-16 09:32:05 +02:00
0c5153bbd6 Rename Proxy::default to ::default_action
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-08-16 09:31:20 +02:00
01784ee3fd Update dependencies 2023-08-16 09:29:18 +02:00
f4bc441ca8 Enable explicit ipv4 / ipv6 proxying
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-06-02 17:35:29 +02:00
f010f8c76b Update dependencies
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-06-02 17:35:13 +02:00
8fbc0c370a Add error messages when failed to start server 2021-12-30 22:05:25 +08:00
bff92738d5 Allow config path from FOURTH_CONFIG 2021-11-01 16:06:47 +08:00
754a5af794 Add publish CI and run fmt 2021-11-01 15:56:57 +08:00
fc7a3038bd Add unknown protocol error 2021-11-01 15:32:08 +08:00
8a96de9666 Update README and minor refactor 2021-11-01 15:25:12 +08:00
0407f4b40c Add config validation 2021-11-01 13:45:47 +08:00
47be2568ba Add upstream scheme support
Need to implement TCP and UDP upstream support.
2021-10-31 19:21:32 +08:00
5944beb6a2 Combine TCP and KCP tests 2021-10-27 08:36:24 +08:00
4363e3f76a Publish 0.1.3 and update README 2021-10-26 23:58:00 +08:00
ee9d0685b3 Refactor TCP and KCP test 2021-10-26 23:52:07 +08:00
421ad8c979 Fix example config 2021-10-26 23:27:03 +08:00
a88a263d20 Move tokio_kcp to local files 2021-10-26 23:02:05 +08:00
bfce455a7e Add Cargo installation method 2021-10-26 21:40:40 +08:00
55eef8581c Add KCP support 2021-10-26 21:36:12 +08:00
23 changed files with 1477 additions and 616 deletions

3
.cargo/config.toml Normal file
View File

@ -0,0 +1,3 @@
[profile.release]
lto = "thin"
strip = true

92
.drone.jsonnet Normal file
View File

@ -0,0 +1,92 @@
local executableName = 'l4p';
local build_image = 'img.kie.rs/jjkiers/rust-cross:rust1.71.1-zig';
local archs = [
{ target: 'aarch64-unknown-linux-musl', short: 'arm64-musl' },
{ target: 'x86_64-pc-windows-gnu', short: 'windows' },
{ target: 'x86_64-unknown-linux-musl', short: 'amd64-musl' },
];
local getStepName(arch) = 'Build for ' + arch.short;
local builtExecutableName(arch) = executableName + if std.length(std.findSubstr(arch.short, 'windows')) > 0 then '.exe' else '';
local targetExecutableName(arch) = executableName + '-' + arch.target + if std.length(std.findSubstr(arch.short, 'windows')) > 0 then '.exe' else '';
local getVolumeName(arch) = 'target-' + arch.target;
local getLocalVolumes(arch) = [
{
name: getVolumeName(arch),
temp: {},
}
for arch in archs
];
local add_build_steps() = [
{
name: getStepName(arch),
image: build_image,
commands: [
'echo Hello World from Jsonnet on ' + arch.target + '!',
'cargo zigbuild --release --target ' + arch.target,
'cp target/' + arch.target + '/release/' + builtExecutableName(arch) + ' artifacts/' + targetExecutableName(arch),
'rm -rf target/' + arch.target + '/release/*',
],
depends_on: ['Prepare'],
volumes: [{
name: getVolumeName(arch),
path: '/drone/src/target',
}],
}
for arch in archs
];
{
kind: 'pipeline',
type: 'docker',
name: 'default',
platform: {
arch: 'amd64',
},
steps:
[{
name: 'Prepare',
image: build_image,
commands: [
'mkdir artifacts',
'echo Using image: ' + build_image,
'cargo --version',
'rustc --version',
],
}] +
add_build_steps() +
[
{
name: 'Show built artifacts',
image: build_image,
commands: [
'ls -lah artifacts',
],
depends_on: [getStepName(a) for a in archs],
},
{
name: 'Create release on gitea',
image: 'plugins/gitea-release',
settings: {
api_key: {
from_secret: 'gitea_token',
},
base_url: 'https://code.kiers.eu',
files: 'artifacts/*',
checksum: 'sha256',
},
when: {
event: ['tag', 'promote'],
},
depends_on: ['Show built artifacts'],
},
],
volumes: getLocalVolumes(archs),
image_pull_secrets: ['docker_private_repo'],
}

View File

@ -1,24 +0,0 @@
name: Rust
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Upgrade Rust
run: rustup update
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

1
.gitignore vendored
View File

@ -1 +1,2 @@
/target /target
config.yaml

779
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,24 +1,34 @@
[package] [package]
name = "fourth" name = "l4p"
version = "0.1.1" version = "0.1.8"
edition = "2021" edition = "2021"
authors = ["LI Rui <lr_cn@outlook.com>"] authors = ["Jacob Kiers <code@kiers.eu>"]
license = "Apache-2.0" license = "Apache-2.0"
description = "Simple and fast layer 4 proxy in Rust" description = "Simple and fast layer 4 proxy in Rust"
readme = "README.md" readme = "README.md"
homepage = "https://github.com/KernelErr/fourth" homepage = "https://code.kiers.eu/jjkiers/layer4-proxy"
repository = "https://github.com/KernelErr/fourth" repository = "https://code.kiers.eu/jjkiers/layer4-proxy"
keywords = ["proxy", "network"] keywords = ["proxy", "network"]
categories = ["web-programming"] categories = ["web-programming"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] exclude = [".*"]
log = "0.4"
pretty_env_logger = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8"
futures = "0.3"
tls-parser = "0.11"
tokio = { version = "1.0", features = ["full"] } [[bin]]
name = "l4p"
path = "src/main.rs"
[dependencies]
async-trait = "0.1.73"
byte_string = "1"
bytes = "1.1"
futures = "0.3"
log = "0.4"
pretty_env_logger = "0.5"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9.21"
time = { version = "0.3.1", features = ["local-offset", "formatting"] }
tls-parser = "0.11"
tokio = { version = "1.0", features = ["full"] }
url = "2.2.2"

View File

@ -1,58 +0,0 @@
# Fourth
> Hey, now we are on level 4!
[![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml)
Fourth is a layer 4 proxy implemented by Rust to listen on specific ports and transfer data to remote addresses according to configuration.
## Features
- Listen on specific port and proxy to local or remote port
- SNI-based rule without terminating TLS connection
## Installation
To gain best performance on your computer's architecture, please consider build the source code. First, you may need [Rust tool chain](https://rustup.rs/).
```bash
$ cd fourth
$ cargo build --release
```
Binary file will be generated at `target/release/fourth`, or you can use `cargo install --path .` to install.
## Configuration
Fourth will read yaml format configuration file from `/etc/fourth/config.yaml`, here is an example:
```yaml
version: 1
log: info
servers:
example_server:
listen:
- "0.0.0.0:443"
- "[::]:443"
tls: true # Enable TLS features like SNI
sni:
proxy.example.com: proxy
www.example.com: nginx
default: ban
relay_server:
listen:
- "127.0.0.1:8081"
default: remote
upstream:
nginx: "127.0.0.1:8080"
proxy: "127.0.0.1:1024"
other: "www.remote.example.com:8082" # proxy to remote address
```
Built-in two upstreams: ban(terminate connection immediately), echo
## License
Fourth is available under terms of Apache-2.0.

View File

@ -1,66 +1,64 @@
# Fourth # l4p
> 这一波在第四层。 > Hey, now we are on level 4!
[![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml) ![CI](https://drone-ci.kiers.eu/api/badges/jjkiers/layer4-proxy/status.svg)
[English](/README-EN.md) `l4p` is a layer 4 proxy implemented by Rust to listen on specific ports and transfer TCP data to remote addresses (only TCP) according to the configuration.
Fourth是一个Rust实现的Layer 4代理用于监听指定端口TCP流量并根据规则转发到指定目标。 ## Features
## 功能 - Listen on specific port and proxy to local or remote port
- SNI-based rule without terminating TLS connection
- DNS-based backend with periodic resolution
- 监听指定端口代理到本地或远端指定端口 ## Installation
- 监听指定端口通过TLS ClientHello消息中的SNI进行分流
## 安装方法 To gain best performance on your computer's architecture, please consider build the source code. First, you may need [Rust tool chain](https://rustup.rs/).
为了确保获得您架构下的最佳性能,请考虑自行编译,首选需要确保您拥有[Rust工具链](https://rustup.rs/)。
```bash ```bash
$ cd fourth $ cd l4p
$ cargo build --release $ cargo build --release
``` ```
将在`target/release/fourth`生成二进制文件,您也可以使用`cargo install --path . `来安装二进制文件。 Binary file will be generated at `target/release/l4p`, or you can use `cargo install --path .` to install.
## 配置 Or you can use Cargo to install `l4p`:
Fourth使用yaml格式的配置文件默认情况下会读取`/etc/fourth/config.yaml`,如下是一个示例配置。 ```bash
$ cargo install l4p
```
Or you can download binary file form the Release page.
## Configuration
`l4p` will read yaml format configuration file from `/etc/l4p/l4p.yaml`, and you can set custom path to environment variable `L4P_CONFIG`, here is an minimal viable example:
```yaml ```yaml
version: 1 version: 1
log: info log: info
servers: servers:
example_server: proxy_server:
listen:
- "0.0.0.0:443"
- "[::]:443"
tls: true # 启动SNI分流将根据TLS请求中的主机名分流
sni:
proxy.example.com: proxy
www.example.com: nginx
default: ban
relay_server:
listen: listen:
- "127.0.0.1:8081" - "127.0.0.1:8081"
default: remote default: remote
upstream: upstream:
nginx: "127.0.0.1:8080" remote: "tcp://www.remote.example.com:8082" # proxy to remote address
proxy: "127.0.0.1:1024"
other: "www.remote.example.com:8082" # 代理到远端地址
``` ```
内置两个的upstreamban立即中断连接、echo返回读到的数据 There are two upstreams built in:
* Ban, which terminates the connection immediately
* Echo, which reflects back with the input
## io_uring? For detailed configuration, check [this example](./config.yaml.example).
尽管经过了很多尝试我们发现目前一些Rust下面的io_uring实现存在问题我们使用的io_uring库实现尽管在吞吐量上可以做到单线程20Gbps相比之下Tokio仅有8Gbps但在QPS上存在性能损失较大的问题。因此在有成熟的io_uring实现之前我们仍然选择epoll。之后我们会持续关注相关进展。 ## Thanks
可能以后会为Linux高内核版本的用户提供可选的io_uring加速。 - [`l4p`](https://crates.io/crates/`l4p`), of which this is a heavily modified fork.
## 协议 ## License
Fourth以Apache-2.0协议开源。 `l4p` is available under terms of Apache-2.0.

21
config.yaml.example Normal file
View File

@ -0,0 +1,21 @@
version: 1
log: debug
servers:
first_server:
listen:
- "0.0.0.0:8443"
- "[::]:8443"
tls: true # Enable TLS features like SNI filtering
sni:
api.example.org: example-api
www.example.org: proxy
default: ban
second-server:
listen: [ "127.0.0.1:8080" ]
default: echo
upstream:
proxy: "tcp://new-www.example.org:443" # Connect over IPv4 or IPv6 to new-www.example.org:443
example-api: "tcp6://api-v1.example.com:443" # Connect over IPv6 to api-v1.example.com:443

View File

@ -17,6 +17,6 @@ servers:
default: remote default: remote
upstream: upstream:
nginx: "127.0.0.1:8080" nginx: "tcp://127.0.0.1:8080"
proxy: "127.0.0.1:1024" proxy: "tcp://127.0.0.1:1024"
other: "www.remote.example.com:8082" # proxy to remote address remote: "tcp://www.remote.example.com:8082" # proxy to remote address

51
l4p.service Normal file
View File

@ -0,0 +1,51 @@
[Unit]
Description=l4p - Layer 4 proxy
After=network-online.target
Wants=network-online.target
[Install]
WantedBy=default.target
[Service]
Type=simple
# Allow read-only access to the config directory
ReadOnlyPaths=/etc/l4p
# Path to the binary
ExecStart=/usr/local/bin/l4p
# Needs CAP_NET_BIND_SERVICE in order to bind to lower ports
# When using ports above 1024, these should be made empty
AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE
# Run as a dynamic user
DynamicUser=yes
# Security
PrivateTmp=yes
PrivateDevices=yes
ProtectSystem=strict
ProtectHome=yes
SystemCallFilter=@basic-io @file-system @network-io @system-service
SystemCallFilter=~@privileged
SystemCallFilter=~@resources
NoNewPrivileges=yes
ProtectProc=invisible
RemoveIPC=yes
RestrictAddressFamilies=AF_INET AF_INET6
RestrictNamespaces=yes
ProtectHostname=yes
ProtectClock=yes
ProtectKernelModules=yes
ProtectKernelLogs=yes
ProtectControlGroups=yes
LockPersonality=yes
MemoryDenyWriteExecute=yes
RestrictRealtime=yes
ProcSubset=pid
UMask=0077
SystemCallArchitectures=native
RestrictSUIDSGID=yes
ProtectKernelTunables=yes

View File

@ -1,92 +0,0 @@
use log::debug;
use serde::Deserialize;
use std::collections::HashMap;
use std::fs::File;
use std::io::{Error as IOError, Read};
#[derive(Debug, Clone)]
pub struct Config {
pub base: BaseConfig,
}
#[derive(Debug, Default, Deserialize, Clone)]
pub struct BaseConfig {
pub version: i32,
pub log: Option<String>,
pub servers: HashMap<String, ServerConfig>,
pub upstream: HashMap<String, String>,
}
#[derive(Debug, Default, Deserialize, Clone)]
pub struct ServerConfig {
pub listen: Vec<String>,
pub tls: Option<bool>,
pub sni: Option<HashMap<String, String>>,
pub default: Option<String>,
}
#[derive(Debug)]
pub enum ConfigError {
IO(IOError),
Yaml(serde_yaml::Error),
Custom(String),
}
impl Config {
pub fn new(path: &str) -> Result<Config, ConfigError> {
let base = (load_config(path))?;
Ok(Config { base })
}
}
fn load_config(path: &str) -> Result<BaseConfig, ConfigError> {
let mut contents = String::new();
let mut file = (File::open(path))?;
(file.read_to_string(&mut contents))?;
let parsed: BaseConfig = serde_yaml::from_str(&contents)?;
if parsed.version != 1 {
return Err(ConfigError::Custom(
"Unsupported config version".to_string(),
));
}
let log_level = parsed.log.clone().unwrap_or_else(|| "info".to_string());
if !log_level.eq("disable") {
std::env::set_var("FOURTH_LOG", log_level.clone());
pretty_env_logger::init_custom_env("FOURTH_LOG");
debug!("Set log level to {}", log_level);
}
debug!("Config version {}", parsed.version);
Ok(parsed)
}
impl From<IOError> for ConfigError {
fn from(err: IOError) -> ConfigError {
ConfigError::IO(err)
}
}
impl From<serde_yaml::Error> for ConfigError {
fn from(err: serde_yaml::Error) -> ConfigError {
ConfigError::Yaml(err)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_config() {
let config = Config::new("tests/config.yaml").unwrap();
assert_eq!(config.base.version, 1);
assert_eq!(config.base.log.unwrap(), "disable");
assert_eq!(config.base.servers.len(), 2);
assert_eq!(config.base.upstream.len(), 2);
}
}

228
src/config/config_v1.rs Normal file
View File

@ -0,0 +1,228 @@
use crate::upstreams::ProxyToUpstream;
use crate::upstreams::Upstream;
use log::{debug, info, warn};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{Error as IOError, Read};
use url::Url;
#[derive(Debug, Clone)]
pub struct ConfigV1 {
pub base: ParsedConfigV1,
}
#[derive(Debug, Default, Deserialize, Clone)]
pub struct ParsedConfigV1 {
pub version: i32,
pub log: Option<String>,
pub servers: HashMap<String, ServerConfig>,
pub upstream: HashMap<String, Upstream>,
}
#[derive(Debug, Default, Deserialize, Clone)]
pub struct BaseConfig {
pub version: i32,
pub log: Option<String>,
pub servers: HashMap<String, ServerConfig>,
pub upstream: HashMap<String, String>,
}
#[derive(Debug, Default, Deserialize, Clone)]
pub struct ServerConfig {
pub listen: Vec<String>,
pub protocol: Option<String>,
pub tls: Option<bool>,
pub sni: Option<HashMap<String, String>>,
pub default: Option<String>,
}
impl TryInto<ProxyToUpstream> for &str {
type Error = ConfigError;
fn try_into(self) -> Result<ProxyToUpstream, Self::Error> {
let upstream_url = match Url::parse(self) {
Ok(url) => url,
Err(_) => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
self
)))
}
};
let upstream_host = match upstream_url.host_str() {
Some(host) => host,
None => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
self
)))
}
};
let upstream_port = match upstream_url.port_or_known_default() {
Some(port) => port,
None => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
self
)))
}
};
match upstream_url.scheme() {
"tcp" | "tcp4" | "tcp6" => {}
_ => {
return Err(ConfigError::Custom(format!(
"Invalid upstream scheme {}",
self
)))
}
}
Ok(ProxyToUpstream::new(
format!("{}:{}", upstream_host, upstream_port),
upstream_url.scheme().to_string(),
))
}
}
#[derive(Debug)]
pub enum ConfigError {
IO(IOError),
Yaml(serde_yaml::Error),
Custom(String),
}
impl ConfigV1 {
pub fn new(path: &str) -> Result<ConfigV1, ConfigError> {
let base = load_config(path)?;
Ok(ConfigV1 { base })
}
}
fn load_config(path: &str) -> Result<ParsedConfigV1, ConfigError> {
let mut contents = String::new();
let mut file = File::open(path)?;
file.read_to_string(&mut contents)?;
let base: BaseConfig = serde_yaml::from_str(&contents)?;
if base.version != 1 {
return Err(ConfigError::Custom(
"Unsupported config version".to_string(),
));
}
let log_level = base.log.clone().unwrap_or_else(|| "info".to_string());
if !log_level.eq("disable") {
std::env::set_var("FOURTH_LOG", log_level.clone());
pretty_env_logger::init_custom_env("FOURTH_LOG");
}
info!("Using config file: {}", &path);
debug!("Set log level to {}", log_level);
debug!("Config version {}", base.version);
let mut parsed_upstream: HashMap<String, Upstream> = HashMap::new();
parsed_upstream.insert("ban".to_string(), Upstream::Ban);
parsed_upstream.insert("echo".to_string(), Upstream::Echo);
for (name, upstream) in base.upstream.iter() {
let ups = upstream.as_str().try_into()?;
parsed_upstream.insert(name.to_string(), Upstream::Proxy(ups));
}
let parsed = ParsedConfigV1 {
version: base.version,
log: base.log,
servers: base.servers,
upstream: parsed_upstream,
};
verify_config(parsed)
}
fn verify_config(config: ParsedConfigV1) -> Result<ParsedConfigV1, ConfigError> {
let mut used_upstreams: HashSet<String> = HashSet::new();
let mut upstream_names: HashSet<String> = HashSet::new();
let mut listen_addresses: HashSet<String> = HashSet::new();
// Check for duplicate upstream names
for (name, _) in config.upstream.iter() {
if upstream_names.contains(name) {
return Err(ConfigError::Custom(format!(
"Duplicate upstream name {}",
name
)));
}
upstream_names.insert(name.to_string());
}
for (_, server) in config.servers.clone() {
// check for duplicate listen addresses
for listen in server.listen {
if listen_addresses.contains(&listen) {
return Err(ConfigError::Custom(format!(
"Duplicate listen address {}",
listen
)));
}
listen_addresses.insert(listen.to_string());
}
if server.tls.unwrap_or_default() && server.sni.is_some() {
for (_, val) in server.sni.unwrap() {
used_upstreams.insert(val.to_string());
}
}
if server.default.is_some() {
used_upstreams.insert(server.default.unwrap().to_string());
}
for key in &used_upstreams {
if !config.upstream.contains_key(key) {
return Err(ConfigError::Custom(format!("Upstream {} not found", key)));
}
}
}
for key in &upstream_names {
if !used_upstreams.contains(key) && !key.eq("echo") && !key.eq("ban") {
warn!("Upstream {} not used", key);
}
}
Ok(config)
}
impl From<IOError> for ConfigError {
fn from(err: IOError) -> ConfigError {
ConfigError::IO(err)
}
}
impl From<serde_yaml::Error> for ConfigError {
fn from(err: serde_yaml::Error) -> ConfigError {
ConfigError::Yaml(err)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_config() {
let config = ConfigV1::new("tests/config.yaml").unwrap();
assert_eq!(config.base.version, 1);
assert_eq!(config.base.log.unwrap(), "disable");
assert_eq!(config.base.servers.len(), 3);
assert_eq!(config.base.upstream.len(), 3 + 2); // Add ban and echo upstreams
}
}

3
src/config/mod.rs Normal file
View File

@ -0,0 +1,3 @@
mod config_v1;
pub(crate) use config_v1::ConfigV1;
pub(crate) use config_v1::ParsedConfigV1;

View File

@ -1,13 +1,26 @@
mod config; mod config;
mod servers; mod servers;
mod upstreams;
use crate::config::Config; use crate::config::ConfigV1;
use crate::servers::Server; use crate::servers::Server;
use log::{debug, error}; use log::{debug, error, info};
use std::path::PathBuf;
fn main() { fn main() {
let config = match Config::new("/etc/fourth/config.yaml") { let config_path = match find_config() {
Ok(p) => p,
Err(paths) => {
println!("Could not find config file. Tried paths:");
for p in paths {
println!("- {}", p);
}
std::process::exit(1);
}
};
let config = match ConfigV1::new(&config_path) {
Ok(config) => config, Ok(config) => config,
Err(e) => { Err(e) => {
println!("Could not load config: {:?}", e); println!("Could not load config: {:?}", e);
@ -16,9 +29,43 @@ fn main() {
}; };
debug!("{:?}", config); debug!("{:?}", config);
let mut server = Server::new(config.base); let mut server = Server::new_from_v1_config(config.base);
debug!("{:?}", server); debug!("{:?}", server);
let res = server.run(); let _ = server.run();
error!("Server returned an error: {:?}", res); error!("Server ended with errors");
}
fn find_config() -> Result<String, Vec<String>> {
let possible_locations = ["/etc/l4p", ""];
let possible_names = ["l4p.yaml", "config.yaml"];
let mut tried_paths = Vec::<String>::new();
let mut possible_paths = Vec::<PathBuf>::new();
if let Ok(env_path) = std::env::var("L4P_CONFIG") {
possible_paths.push(PathBuf::from(env_path));
}
possible_paths.append(
&mut possible_locations
.iter()
.flat_map(|&path| {
possible_names
.iter()
.map(move |&file| PathBuf::new().join(path).join(file))
})
.collect::<Vec<PathBuf>>(),
);
for path in possible_paths {
let path_str = path.to_string_lossy().to_string();
if path.exists() {
return Ok(path_str);
}
tried_paths.push(path_str);
}
Err(tried_paths)
} }

View File

@ -1,41 +1,41 @@
use futures::future::try_join; use log::{error, info};
use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
mod tls; mod protocol;
use self::tls::get_sni; pub(crate) mod upstream_address;
use crate::config::BaseConfig;
use crate::config::ParsedConfigV1;
use crate::upstreams::Upstream;
use protocol::tcp;
#[derive(Debug)] #[derive(Debug)]
pub struct Server { pub(crate) struct Server {
pub proxies: Vec<Arc<Proxy>>, pub proxies: Vec<Arc<Proxy>>,
pub config: BaseConfig,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Proxy { pub(crate) struct Proxy {
pub name: String, pub name: String,
pub listen: SocketAddr, pub listen: SocketAddr,
pub protocol: String,
pub tls: bool, pub tls: bool,
pub sni: Option<HashMap<String, String>>, pub sni: Option<HashMap<String, String>>,
pub default: String, pub default_action: String,
pub upstream: HashMap<String, String>, pub upstream: HashMap<String, Upstream>,
} }
impl Server { impl Server {
pub fn new(config: BaseConfig) -> Self { pub fn new_from_v1_config(config: ParsedConfigV1) -> Self {
let mut new_server = Server { let mut new_server = Server {
proxies: Vec::new(), proxies: Vec::new(),
config: config.clone(),
}; };
for (name, proxy) in config.servers.iter() { for (name, proxy) in config.servers.iter() {
let protocol = proxy.protocol.clone().unwrap_or_else(|| "tcp".to_string());
let tls = proxy.tls.unwrap_or(false); let tls = proxy.tls.unwrap_or(false);
let sni = proxy.sni.clone(); let sni = proxy.sni.clone();
let default = proxy.default.clone().unwrap_or_else(|| "ban".to_string()); let default = proxy.default.clone().unwrap_or_else(|| "ban".to_string());
@ -48,7 +48,6 @@ impl Server {
upstream_set.insert(key.clone()); upstream_set.insert(key.clone());
} }
for listen in proxy.listen.clone() { for listen in proxy.listen.clone() {
println!("{:?}", listen);
let listen_addr: SocketAddr = match listen.parse() { let listen_addr: SocketAddr = match listen.parse() {
Ok(addr) => addr, Ok(addr) => addr,
Err(_) => { Err(_) => {
@ -56,12 +55,14 @@ impl Server {
continue; continue;
} }
}; };
let proxy = Proxy { let proxy = Proxy {
name: name.clone(), name: name.clone(),
listen: listen_addr, listen: listen_addr,
protocol: protocol.clone(),
tls, tls,
sni: sni.clone(), sni: sni.clone(),
default: default.clone(), default_action: default.clone(),
upstream: upstream.clone(), upstream: upstream.clone(),
}; };
new_server.proxies.push(Arc::new(proxy)); new_server.proxies.push(Arc::new(proxy));
@ -77,9 +78,22 @@ impl Server {
let mut handles: Vec<JoinHandle<()>> = Vec::new(); let mut handles: Vec<JoinHandle<()>> = Vec::new();
for config in proxies { for config in proxies {
info!("Starting server {} on {}", config.name, config.listen); info!(
"Starting {} server {} on {}",
config.protocol, config.name, config.listen
);
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let _ = proxy(config).await; match config.protocol.as_ref() {
"tcp" | "tcp4" | "tcp6" => {
let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
}
}
_ => {
error!("Invalid protocol: {}", config.protocol)
}
}
}); });
handles.push(handle); handles.push(handle);
} }
@ -91,143 +105,67 @@ impl Server {
} }
} }
async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(config.listen).await?;
let config = config.clone();
loop {
let thread_proxy = config.clone();
match listener.accept().await {
Err(err) => {
error!("Failed to accept connection: {}", err);
return Err(Box::new(err));
}
Ok((stream, _)) => {
tokio::spawn(async move {
match accept(stream, thread_proxy).await {
Ok(_) => {}
Err(err) => {
error!("Relay thread returned an error: {}", err);
}
};
});
}
}
}
}
async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
debug!("New connection from {:?}", inbound.peer_addr()?);
let upstream_name = match proxy.tls {
false => proxy.default.clone(),
true => {
let mut hello_buf = [0u8; 1024];
inbound.peek(&mut hello_buf).await?;
let snis = get_sni(&hello_buf);
if snis.is_empty() {
proxy.default.clone()
} else {
match proxy.sni.clone() {
Some(sni_map) => {
let mut upstream = proxy.default.clone();
for sni in snis {
let m = sni_map.get(&sni);
if m.is_some() {
upstream = m.unwrap().clone();
break;
}
}
upstream
}
None => proxy.default.clone(),
}
}
}
};
debug!("Upstream: {}", upstream_name);
let upstream = match proxy.upstream.get(&upstream_name) {
Some(upstream) => upstream,
None => {
warn!(
"No upstream named {:?} on server {:?}",
proxy.default, proxy.name
);
return process(inbound, &proxy.default).await;
}
};
return process(inbound, upstream).await;
}
async fn process(mut inbound: TcpStream, upstream: &str) -> Result<(), Box<dyn std::error::Error>> {
if upstream == "ban" {
let _ = inbound.shutdown();
return Ok(());
} else if upstream == "echo" {
loop {
let mut buf = [0u8; 1];
let b = inbound.read(&mut buf).await?;
if b == 0 {
break;
} else {
inbound.write(&buf).await?;
}
}
return Ok(());
}
let outbound = TcpStream::connect(upstream).await?;
let (mut ri, mut wi) = io::split(inbound);
let (mut ro, mut wo) = io::split(outbound);
let inbound_to_outbound = copy(&mut ri, &mut wo);
let outbound_to_inbound = copy(&mut ro, &mut wi);
let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?;
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
Ok(())
}
async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
where
R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
{
match io::copy(reader, writer).await {
Ok(u64) => {
let _ = writer.shutdown().await;
Ok(u64)
}
Err(_) => Ok(0),
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod tests {
use std::thread::{self, sleep}; use std::thread::{self, sleep};
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use super::*; use super::*;
#[tokio::main]
async fn tcp_mock_server() {
let server_addr: SocketAddr = "127.0.0.1:54599".parse().unwrap();
let listener = TcpListener::bind(server_addr).await.unwrap();
loop {
let (mut stream, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 2];
let mut n = stream.read(&mut buf).await.unwrap();
while n > 0 {
let _ = stream.write(b"hello").await.unwrap();
if buf.eq(b"by") {
stream.shutdown().await.unwrap();
break;
}
n = stream.read(&mut buf).await.unwrap();
}
stream.shutdown().await.unwrap();
}
}
#[tokio::test] #[tokio::test]
async fn test_echo_server() { async fn test_proxy() {
use crate::config::Config; use crate::config::ConfigV1;
let config = Config::new("tests/config.yaml").unwrap(); let config = ConfigV1::new("tests/config.yaml").unwrap();
let mut server = Server::new(config.base); let mut server = Server::new_from_v1_config(config.base);
thread::spawn(move || {
tcp_mock_server();
});
sleep(Duration::from_secs(1)); // wait for server to start
thread::spawn(move || { thread::spawn(move || {
let _ = server.run(); let _ = server.run();
}); });
sleep(Duration::from_secs(1)); // wait for server to start sleep(Duration::from_secs(1)); // wait for server to start
let mut conn = TcpStream::connect("127.0.0.1:54956").await.unwrap();
// test TCP proxy
let mut conn = tokio::net::TcpStream::connect("127.0.0.1:54500")
.await
.unwrap();
let mut buf = [0u8; 5];
let _ = conn.write(b"hi").await.unwrap();
let _ = conn.read(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
conn.shutdown().await.unwrap();
// test TCP echo
let mut conn = tokio::net::TcpStream::connect("127.0.0.1:54956")
.await
.unwrap();
let mut buf = [0u8; 1]; let mut buf = [0u8; 1];
for i in 0..=255u8 { for i in 0..=10u8 {
conn.write(&[i]).await.unwrap(); let _ = conn.write(&[i]).await.unwrap();
conn.read(&mut buf).await.unwrap(); let _ = conn.read(&mut buf).await.unwrap();
assert_eq!(&buf, &[i]); assert_eq!(&buf, &[i]);
} }
conn.shutdown().await.unwrap(); conn.shutdown().await.unwrap();

View File

@ -0,0 +1,2 @@
pub mod tcp;
pub mod tls;

View File

@ -0,0 +1,77 @@
use crate::servers::protocol::tls::get_sni;
use crate::servers::Proxy;
use log::{debug, error, info, warn};
use std::error::Error;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
pub(crate) async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(config.listen).await?;
let config = config.clone();
loop {
let thread_proxy = config.clone();
match listener.accept().await {
Err(err) => {
error!("Failed to accept connection: {}", err);
return Err(Box::new(err));
}
Ok((stream, _)) => {
tokio::spawn(async move {
match accept(stream, thread_proxy).await {
Ok(_) => {}
Err(err) => {
error!("Relay thread returned an error: {}", err);
}
};
});
}
}
}
}
async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn Error>> {
info!("New connection from {:?}", inbound.peer_addr()?);
let upstream_name = match proxy.tls {
false => proxy.default_action.clone(),
true => {
let mut hello_buf = [0u8; 1024];
inbound.peek(&mut hello_buf).await?;
let snis = get_sni(&hello_buf);
if snis.is_empty() {
proxy.default_action.clone()
} else {
match proxy.sni.clone() {
Some(sni_map) => {
let mut upstream = proxy.default_action.clone();
for sni in snis {
let m = sni_map.get(&sni);
if m.is_some() {
upstream = m.unwrap().clone();
break;
}
}
upstream
}
None => proxy.default_action.clone(),
}
}
}
};
debug!("Upstream: {}", upstream_name);
let upstream = match proxy.upstream.get(&upstream_name) {
Some(upstream) => upstream,
None => {
warn!(
"No upstream named {:?} on server {:?}",
proxy.default_action, proxy.name
);
proxy.upstream.get(&proxy.default_action).unwrap()
}
};
upstream.process(inbound).await
}

View File

@ -49,6 +49,7 @@ pub fn get_sni(buf: &[u8]) -> Vec<String> {
} }
} }
debug!("Found SNIs: {:?}", &snis);
snis snis
} }
@ -98,6 +99,6 @@ mod tests {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
]; ];
let sni = get_sni(&BUF); let sni = get_sni(&BUF);
assert!(sni[0] == "www.lirui.tech".to_string()); assert!(sni[0] == *"www.lirui.tech");
} }
} }

View File

@ -0,0 +1,147 @@
use log::debug;
use std::fmt::{Display, Formatter};
use std::io::Result;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::RwLock;
use time::{Duration, Instant, OffsetDateTime};
#[derive(Debug, Clone, Default)]
pub(crate) struct UpstreamAddress {
address: String,
resolved_addresses: Arc<RwLock<Vec<SocketAddr>>>,
resolved_time: Arc<RwLock<Option<Instant>>>,
ttl: Arc<RwLock<Option<Duration>>>,
}
impl Display for UpstreamAddress {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.address.fmt(f)
}
}
impl UpstreamAddress {
pub fn new(address: String) -> Self {
UpstreamAddress {
address,
..Default::default()
}
}
pub fn is_valid(&self) -> bool {
let r = { *self.resolved_time.read().unwrap() };
if let Some(resolved) = r {
if let Some(ttl) = { *self.ttl.read().unwrap() } {
return resolved.elapsed() < ttl;
}
}
false
}
fn is_resolved(&self) -> bool {
!self.resolved_addresses.read().unwrap().is_empty()
}
fn time_remaining(&self) -> Duration {
if !self.is_valid() {
return Duration::seconds(0);
}
let rt = { *self.resolved_time.read().unwrap() };
let ttl = { *self.ttl.read().unwrap() };
ttl.unwrap() - rt.unwrap().elapsed()
}
pub async fn resolve(&self, mode: ResolutionMode) -> Result<Vec<SocketAddr>> {
if self.is_resolved() && self.is_valid() {
debug!(
"Already got address {:?}, still valid for {:.3}s",
&self.resolved_addresses,
self.time_remaining().as_seconds_f64()
);
return Ok(self.resolved_addresses.read().unwrap().clone());
}
debug!(
"Resolving addresses for {} with mode {:?}",
&self.address, &mode
);
let lookup_result = tokio::net::lookup_host(&self.address).await;
let resolved_addresses: Vec<SocketAddr> = match lookup_result {
Ok(resolved_addresses) => resolved_addresses.into_iter().collect(),
Err(e) => {
debug!("Failed looking up {}: {}", &self.address, &e);
// Protect against DNS flooding. Cache the result for 1 second.
*self.resolved_time.write().unwrap() = Some(Instant::now());
*self.ttl.write().unwrap() = Some(Duration::seconds(3));
return Err(e);
}
};
debug!("Resolved addresses: {:?}", &resolved_addresses);
let addresses: Vec<SocketAddr> = match mode {
ResolutionMode::Ipv4 => resolved_addresses
.into_iter()
.filter(|a| a.is_ipv4())
.collect(),
ResolutionMode::Ipv6 => resolved_addresses
.into_iter()
.filter(|a| a.is_ipv6())
.collect(),
_ => resolved_addresses,
};
debug!(
"Got {} addresses for {}: {:?}",
&mode, &self.address, &addresses
);
debug!(
"Resolved at {}",
OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.expect("Format")
);
*self.resolved_addresses.write().unwrap() = addresses.clone();
*self.resolved_time.write().unwrap() = Some(Instant::now());
*self.ttl.write().unwrap() = Some(Duration::minutes(1));
Ok(addresses)
}
}
#[derive(Debug, Default, Clone)]
pub(crate) enum ResolutionMode {
#[default]
Ipv4AndIpv6,
Ipv4,
Ipv6,
}
impl From<&str> for ResolutionMode {
fn from(value: &str) -> Self {
match value {
"tcp4" => ResolutionMode::Ipv4,
"tcp6" => ResolutionMode::Ipv6,
"tcp" => ResolutionMode::Ipv4AndIpv6,
_ => panic!("This should never happen. Please check configuration parser."),
}
}
}
impl Display for ResolutionMode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ResolutionMode::Ipv4 => write!(f, "IPv4Only"),
ResolutionMode::Ipv6 => write!(f, "IPv6Only"),
ResolutionMode::Ipv4AndIpv6 => write!(f, "IPv4 and IPv6"),
}
}
}

51
src/upstreams/mod.rs Normal file
View File

@ -0,0 +1,51 @@
mod proxy_to_upstream;
use log::debug;
use serde::Deserialize;
use std::error::Error;
use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
pub use crate::upstreams::proxy_to_upstream::ProxyToUpstream;
#[derive(Debug, Clone, Deserialize)]
pub enum Upstream {
Ban,
Echo,
Proxy(ProxyToUpstream),
}
impl Upstream {
pub(crate) async fn process(&self, mut inbound: TcpStream) -> Result<(), Box<dyn Error>> {
match self {
Upstream::Ban => {
inbound.shutdown().await?;
}
Upstream::Echo => {
let (mut ri, mut wi) = io::split(inbound);
let inbound_to_inbound = copy(&mut ri, &mut wi);
let bytes_tx = inbound_to_inbound.await;
debug!("Bytes read: {:?}", bytes_tx);
}
Upstream::Proxy(config) => {
config.proxy(inbound).await?;
}
};
Ok(())
}
}
async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
where
R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
{
match io::copy(reader, writer).await {
Ok(u64) => {
let _ = writer.shutdown().await;
Ok(u64)
}
Err(_) => Ok(0),
}
}

View File

@ -0,0 +1,57 @@
use crate::servers::upstream_address::UpstreamAddress;
use crate::upstreams::copy;
use futures::future::try_join;
use log::{debug, error};
use serde::Deserialize;
use std::net::SocketAddr;
use tokio::io;
use tokio::net::TcpStream;
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ProxyToUpstream {
pub addr: String,
pub protocol: String,
#[serde(skip_deserializing)]
addresses: UpstreamAddress,
}
impl ProxyToUpstream {
pub async fn resolve_addresses(&self) -> std::io::Result<Vec<SocketAddr>> {
self.addresses.resolve((*self.protocol).into()).await
}
pub fn new(address: String, protocol: String) -> Self {
Self {
addr: address.clone(),
protocol,
addresses: UpstreamAddress::new(address),
}
}
pub(crate) async fn proxy(&self, inbound: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let outbound = match self.protocol.as_ref() {
"tcp4" | "tcp6" | "tcp" => {
TcpStream::connect(self.resolve_addresses().await?.as_slice()).await?
}
_ => {
error!("Reached unknown protocol: {:?}", self.protocol);
return Err("Reached unknown protocol".into());
}
};
debug!("Connected to {:?}", outbound.peer_addr().unwrap());
let (mut ri, mut wi) = io::split(inbound);
let (mut ro, mut wo) = io::split(outbound);
let inbound_to_outbound = copy(&mut ri, &mut wo);
let outbound_to_inbound = copy(&mut ro, &mut wi);
let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?;
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
Ok(())
}
}

View File

@ -11,11 +11,16 @@ servers:
proxy.test.com: proxy proxy.test.com: proxy
www.test.com: web www.test.com: web
default: ban default: ban
echo_server: tcp_server:
listen:
- "127.0.0.1:54500"
default: tester
tcp_echo_server:
listen: listen:
- "0.0.0.0:54956" - "0.0.0.0:54956"
default: echo default: echo
upstream: upstream:
web: "127.0.0.1:8080" web: "tcp://127.0.0.1:8080"
proxy: "www.example.com:1024" proxy: "tcp://www.example.com:1024"
tester: "tcp://127.0.0.1:54599"