Compare commits
10 Commits
v0.1.3
...
f4bc441ca8
Author | SHA1 | Date | |
---|---|---|---|
f4bc441ca8 | |||
f010f8c76b | |||
![]() |
8fbc0c370a | ||
![]() |
bff92738d5 | ||
![]() |
754a5af794 | ||
![]() |
fc7a3038bd | ||
![]() |
8a96de9666 | ||
![]() |
0407f4b40c | ||
![]() |
47be2568ba | ||
![]() |
5944beb6a2 |
39
.github/workflows/publish-binaries.yml
vendored
Normal file
39
.github/workflows/publish-binaries.yml
vendored
Normal file
@@ -0,0 +1,39 @@
|
||||
on:
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
name: Publish binaries to release
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
name: Publish for ${{ matrix.os }}
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-latest, macos-latest, windows-latest]
|
||||
include:
|
||||
- os: ubuntu-latest
|
||||
artifact_name: fourth
|
||||
asset_name: fourth-linux-amd64
|
||||
- os: macos-latest
|
||||
artifact_name: fourth
|
||||
asset_name: fourth-macos-amd64
|
||||
- os: windows-latest
|
||||
artifact_name: fourth.exe
|
||||
asset_name: fourth-windows-amd64.exe
|
||||
|
||||
steps:
|
||||
- uses: hecrj/setup-rust-action@master
|
||||
with:
|
||||
rust-version: stable
|
||||
- uses: actions/checkout@v2
|
||||
- name: Build
|
||||
run: cargo build --release --locked
|
||||
- name: Publish
|
||||
uses: svenstaro/upload-release-action@v1-release
|
||||
with:
|
||||
repo_token: ${{ secrets.PUBLISH_TOKEN }}
|
||||
file: target/release/${{ matrix.artifact_name }}
|
||||
asset_name: ${{ matrix.asset_name }}
|
||||
tag: ${{ github.ref }}
|
670
Cargo.lock
generated
670
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "fourth"
|
||||
version = "0.1.3"
|
||||
version = "0.1.5"
|
||||
edition = "2021"
|
||||
authors = ["LI Rui <lr_cn@outlook.com>"]
|
||||
license = "Apache-2.0"
|
||||
@@ -19,12 +19,13 @@ exclude = [".*"]
|
||||
log = "0.4"
|
||||
pretty_env_logger = "0.4"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_yaml = "0.8"
|
||||
serde_yaml = "0.9.21"
|
||||
futures = "0.3"
|
||||
tls-parser = "0.11"
|
||||
url = "2.2.2"
|
||||
time = { version = "0.3.1", features = ["local-offset", "formatting"] }
|
||||
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
|
||||
bytes = "1.1"
|
||||
kcp = "0.4"
|
||||
byte_string = "1"
|
29
README-EN.md
29
README-EN.md
@@ -31,36 +31,35 @@ Or you can use Cargo to install Fourth:
|
||||
$ cargo install fourth
|
||||
```
|
||||
|
||||
Or you can download binary file form the Release page.
|
||||
|
||||
## Configuration
|
||||
|
||||
Fourth will read yaml format configuration file from `/etc/fourth/config.yaml`, here is an example:
|
||||
Fourth will read yaml format configuration file from `/etc/fourth/config.yaml`, and you can set custom path to environment variable `FOURTH_CONFIG`, here is an minimal viable example:
|
||||
|
||||
```yaml
|
||||
version: 1
|
||||
log: info
|
||||
|
||||
servers:
|
||||
example_server:
|
||||
listen:
|
||||
- "0.0.0.0:443"
|
||||
- "[::]:443"
|
||||
tls: true # Enable TLS features like SNI
|
||||
sni:
|
||||
proxy.example.com: proxy
|
||||
www.example.com: nginx
|
||||
default: ban
|
||||
relay_server:
|
||||
proxy_server:
|
||||
listen:
|
||||
- "127.0.0.1:8081"
|
||||
default: remote
|
||||
|
||||
upstream:
|
||||
nginx: "127.0.0.1:8080"
|
||||
proxy: "127.0.0.1:1024"
|
||||
remote: "www.remote.example.com:8082" # proxy to remote address
|
||||
remote: "tcp://www.remote.example.com:8082" # proxy to remote address
|
||||
```
|
||||
|
||||
Built-in two upstreams: ban(terminate connection immediately), echo
|
||||
Built-in two upstreams: ban(terminate connection immediately), echo. For detailed configuration, check [this example](./example-config.yaml).
|
||||
|
||||
## Performance Benchmark
|
||||
|
||||
Tested on 4C2G server:
|
||||
|
||||
Use fourth to proxy to Nginx(QPS of direct connection: ~120000): ~70000 req/s (Command: `wrk -t200 -c1000 -d120s --latency http://proxy-server:8081`)
|
||||
|
||||
Use fourth to proxy to local iperf3: 8Gbps
|
||||
|
||||
## Thanks
|
||||
|
||||
|
34
README.md
34
README.md
@@ -33,41 +33,37 @@ $ cargo build --release
|
||||
$ cargo install fourth
|
||||
```
|
||||
|
||||
或者您也可以直接从Release中下载二进制文件。
|
||||
|
||||
## 配置
|
||||
|
||||
Fourth使用yaml格式的配置文件,默认情况下会读取`/etc/fourth/config.yaml`,如下是一个示例配置。
|
||||
Fourth使用yaml格式的配置文件,默认情况下会读取`/etc/fourth/config.yaml`,您也可以设置自定义路径到环境变量`FOURTH_CONFIG`,如下是一个最小有效配置:
|
||||
|
||||
```yaml
|
||||
version: 1
|
||||
log: info
|
||||
|
||||
servers:
|
||||
example_server:
|
||||
listen:
|
||||
- "0.0.0.0:443"
|
||||
- "[::]:443"
|
||||
tls: true # Enable TLS features like SNI filtering
|
||||
sni:
|
||||
proxy.example.com: proxy
|
||||
www.example.com: nginx
|
||||
default: ban
|
||||
proxy_server:
|
||||
listen:
|
||||
- "127.0.0.1:8081"
|
||||
default: remote
|
||||
kcp_server:
|
||||
protocol: kcp # default TCP
|
||||
listen:
|
||||
- "127.0.0.1:8082"
|
||||
default: echo
|
||||
|
||||
upstream:
|
||||
nginx: "127.0.0.1:8080"
|
||||
proxy: "127.0.0.1:1024"
|
||||
remote: "www.remote.example.com:8082" # proxy to remote address
|
||||
remote: "tcp://www.remote.example.com:8082" # proxy to remote address
|
||||
```
|
||||
|
||||
内置两个的upstream:ban(立即中断连接)、echo(返回读到的数据)。
|
||||
内置两个的upstream:ban(立即中断连接)、echo(返回读到的数据)。更详细的配置可以参考[示例配置](./example-config.yaml)。
|
||||
|
||||
注意:[::]会默认同时绑定IPv4和IPv6。
|
||||
|
||||
## 性能测试
|
||||
|
||||
在4C2G的服务器上测试:
|
||||
|
||||
使用Fourth代理到Nginx(直连QPS 120000): ~70000req/s (测试命令:`wrk -t200 -c1000 -d120s --latency http://proxy-server:8081 `)
|
||||
|
||||
使用Fourth代理到本地iperf3:8Gbps
|
||||
|
||||
## io_uring?
|
||||
|
||||
|
@@ -22,6 +22,6 @@ servers:
|
||||
default: echo
|
||||
|
||||
upstream:
|
||||
nginx: "127.0.0.1:8080"
|
||||
proxy: "127.0.0.1:1024"
|
||||
remote: "www.remote.example.com:8082" # proxy to remote address
|
||||
nginx: "tcp://127.0.0.1:8080"
|
||||
proxy: "tcp://127.0.0.1:1024"
|
||||
remote: "tcp://www.remote.example.com:8082" # proxy to remote address
|
232
src/config.rs
232
src/config.rs
@@ -1,12 +1,25 @@
|
||||
use log::debug;
|
||||
use log::{debug, warn};
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::File;
|
||||
use std::io::{Error as IOError, Read};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::sync::Mutex;
|
||||
use url::Url;
|
||||
use tokio::time::Instant;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub base: BaseConfig,
|
||||
pub base: ParsedConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Deserialize, Clone)]
|
||||
pub struct ParsedConfig {
|
||||
pub version: i32,
|
||||
pub log: Option<String>,
|
||||
pub servers: HashMap<String, ServerConfig>,
|
||||
pub upstream: HashMap<String, Upstream>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Deserialize, Clone)]
|
||||
@@ -26,6 +39,84 @@ pub struct ServerConfig {
|
||||
pub default: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub enum Upstream {
|
||||
Ban,
|
||||
Echo,
|
||||
Custom(CustomUpstream),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Addr(Mutex<Vec<SocketAddr>>);
|
||||
|
||||
impl Default for Addr {
|
||||
fn default() -> Self {
|
||||
Self(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Addr {
|
||||
fn clone(&self) -> Self {
|
||||
tokio::task::block_in_place(|| Self(Mutex::new(self.0.blocking_lock().clone())))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct CustomUpstream {
|
||||
pub name: String,
|
||||
pub addr: String,
|
||||
pub protocol: String,
|
||||
#[serde(skip_deserializing)]
|
||||
addresses: Addr,
|
||||
}
|
||||
|
||||
impl CustomUpstream {
|
||||
pub async fn resolve_addresses(&self) -> std::io::Result<()> {
|
||||
{
|
||||
let addr = self.addresses.0.lock().await;
|
||||
if addr.len() > 0 {
|
||||
debug!("Already have addresses: {:?}", &addr);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Resolving addresses for {}", &self.addr);
|
||||
let addresses = tokio::net::lookup_host(self.addr.clone()).await?;
|
||||
|
||||
let mut addr: Vec<SocketAddr> = match self.protocol.as_ref() {
|
||||
"tcp4" => addresses.into_iter().filter(|a| a.is_ipv4()).collect(),
|
||||
"tcp6" => addresses.into_iter().filter(|a| a.is_ipv6()).collect(),
|
||||
_ => addresses.collect(),
|
||||
};
|
||||
|
||||
debug!("Got addresses for {}: {:?}", &self.addr, &addr);
|
||||
debug!("Resolved at {}", OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).expect("Format"));
|
||||
|
||||
{
|
||||
let mut self_addr = self.addresses.0.lock().await;
|
||||
self_addr.clear();
|
||||
self_addr.append(&mut addr);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_addresses(&self) -> Vec<SocketAddr> {
|
||||
let a = self.addresses.0.lock().await;
|
||||
a.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CustomUpstream {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: Default::default(),
|
||||
addr: Default::default(),
|
||||
protocol: Default::default(),
|
||||
addresses: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ConfigError {
|
||||
IO(IOError),
|
||||
@@ -41,29 +132,150 @@ impl Config {
|
||||
}
|
||||
}
|
||||
|
||||
fn load_config(path: &str) -> Result<BaseConfig, ConfigError> {
|
||||
fn load_config(path: &str) -> Result<ParsedConfig, ConfigError> {
|
||||
let mut contents = String::new();
|
||||
let mut file = (File::open(path))?;
|
||||
(file.read_to_string(&mut contents))?;
|
||||
|
||||
let parsed: BaseConfig = serde_yaml::from_str(&contents)?;
|
||||
let base: BaseConfig = serde_yaml::from_str(&contents)?;
|
||||
|
||||
if parsed.version != 1 {
|
||||
if base.version != 1 {
|
||||
return Err(ConfigError::Custom(
|
||||
"Unsupported config version".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let log_level = parsed.log.clone().unwrap_or_else(|| "info".to_string());
|
||||
let log_level = base.log.clone().unwrap_or_else(|| "info".to_string());
|
||||
if !log_level.eq("disable") {
|
||||
std::env::set_var("FOURTH_LOG", log_level.clone());
|
||||
pretty_env_logger::init_custom_env("FOURTH_LOG");
|
||||
debug!("Set log level to {}", log_level);
|
||||
}
|
||||
|
||||
debug!("Config version {}", parsed.version);
|
||||
debug!("Config version {}", base.version);
|
||||
|
||||
Ok(parsed)
|
||||
let mut parsed_upstream: HashMap<String, Upstream> = HashMap::new();
|
||||
|
||||
for (name, upstream) in base.upstream.iter() {
|
||||
let upstream_url = match Url::parse(upstream) {
|
||||
Ok(url) => url,
|
||||
Err(_) => {
|
||||
return Err(ConfigError::Custom(format!(
|
||||
"Invalid upstream url {}",
|
||||
upstream
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
let upstream_host = match upstream_url.host_str() {
|
||||
Some(host) => host,
|
||||
None => {
|
||||
return Err(ConfigError::Custom(format!(
|
||||
"Invalid upstream url {}",
|
||||
upstream
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
let upsteam_port = match upstream_url.port_or_known_default() {
|
||||
Some(port) => port,
|
||||
None => {
|
||||
return Err(ConfigError::Custom(format!(
|
||||
"Invalid upstream url {}",
|
||||
upstream
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
match upstream_url.scheme() {
|
||||
"tcp" | "tcp4" | "tcp6" => {}
|
||||
_ => {
|
||||
return Err(ConfigError::Custom(format!(
|
||||
"Invalid upstream scheme {}",
|
||||
upstream
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
parsed_upstream.insert(
|
||||
name.to_string(),
|
||||
Upstream::Custom(CustomUpstream {
|
||||
name: name.to_string(),
|
||||
addr: format!("{}:{}", upstream_host, upsteam_port),
|
||||
protocol: upstream_url.scheme().to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
parsed_upstream.insert("ban".to_string(), Upstream::Ban);
|
||||
|
||||
parsed_upstream.insert("echo".to_string(), Upstream::Echo);
|
||||
|
||||
let parsed = ParsedConfig {
|
||||
version: base.version,
|
||||
log: base.log,
|
||||
servers: base.servers,
|
||||
upstream: parsed_upstream,
|
||||
};
|
||||
|
||||
verify_config(parsed)
|
||||
}
|
||||
|
||||
fn verify_config(config: ParsedConfig) -> Result<ParsedConfig, ConfigError> {
|
||||
let mut used_upstreams: HashSet<String> = HashSet::new();
|
||||
let mut upstream_names: HashSet<String> = HashSet::new();
|
||||
let mut listen_addresses: HashSet<String> = HashSet::new();
|
||||
|
||||
// Check for duplicate upstream names
|
||||
for (name, _) in config.upstream.iter() {
|
||||
if upstream_names.contains(name) {
|
||||
return Err(ConfigError::Custom(format!(
|
||||
"Duplicate upstream name {}",
|
||||
name
|
||||
)));
|
||||
}
|
||||
|
||||
upstream_names.insert(name.to_string());
|
||||
}
|
||||
|
||||
for (_, server) in config.servers.clone() {
|
||||
// check for duplicate listen addresses
|
||||
for listen in server.listen {
|
||||
if listen_addresses.contains(&listen) {
|
||||
return Err(ConfigError::Custom(format!(
|
||||
"Duplicate listen address {}",
|
||||
listen
|
||||
)));
|
||||
}
|
||||
|
||||
listen_addresses.insert(listen.to_string());
|
||||
}
|
||||
|
||||
if server.tls.unwrap_or_default() && server.sni.is_some() {
|
||||
for (_, val) in server.sni.unwrap() {
|
||||
used_upstreams.insert(val.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if server.default.is_some() {
|
||||
used_upstreams.insert(server.default.unwrap().to_string());
|
||||
}
|
||||
|
||||
for key in &used_upstreams {
|
||||
if !config.upstream.contains_key(key) {
|
||||
return Err(ConfigError::Custom(format!("Upstream {} not found", key)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for key in &upstream_names {
|
||||
if !used_upstreams.contains(key) && !key.eq("echo") && !key.eq("ban") {
|
||||
warn!("Upstream {} not used", key);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
impl From<IOError> for ConfigError {
|
||||
@@ -88,6 +300,6 @@ mod tests {
|
||||
assert_eq!(config.base.version, 1);
|
||||
assert_eq!(config.base.log.unwrap(), "disable");
|
||||
assert_eq!(config.base.servers.len(), 5);
|
||||
assert_eq!(config.base.upstream.len(), 3);
|
||||
assert_eq!(config.base.upstream.len(), 3 + 2); // Add ban and echo upstreams
|
||||
}
|
||||
}
|
||||
|
10
src/main.rs
10
src/main.rs
@@ -6,9 +6,13 @@ use crate::config::Config;
|
||||
use crate::servers::Server;
|
||||
|
||||
use log::{debug, error};
|
||||
use std::env;
|
||||
|
||||
fn main() {
|
||||
let config = match Config::new("/etc/fourth/config.yaml") {
|
||||
let config_path =
|
||||
env::var("FOURTH_CONFIG").unwrap_or_else(|_| "/etc/fourth/config.yaml".to_string());
|
||||
|
||||
let config = match Config::new(&config_path) {
|
||||
Ok(config) => config,
|
||||
Err(e) => {
|
||||
println!("Could not load config: {:?}", e);
|
||||
@@ -20,6 +24,6 @@ fn main() {
|
||||
let mut server = Server::new(config.base);
|
||||
debug!("{:?}", server);
|
||||
|
||||
let res = server.run();
|
||||
error!("Server returned an error: {:?}", res);
|
||||
let _ = server.run();
|
||||
error!("Server ended with errors");
|
||||
}
|
||||
|
@@ -1 +1 @@
|
||||
pub mod kcp;
|
||||
//pub mod kcp;
|
||||
|
@@ -5,13 +5,13 @@ use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
mod protocol;
|
||||
use crate::config::BaseConfig;
|
||||
use protocol::{kcp, tcp};
|
||||
use crate::config::{ParsedConfig, Upstream};
|
||||
use protocol::tcp;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Server {
|
||||
pub proxies: Vec<Arc<Proxy>>,
|
||||
pub config: BaseConfig,
|
||||
pub config: ParsedConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -22,11 +22,11 @@ pub struct Proxy {
|
||||
pub tls: bool,
|
||||
pub sni: Option<HashMap<String, String>>,
|
||||
pub default: String,
|
||||
pub upstream: HashMap<String, String>,
|
||||
pub upstream: HashMap<String, Upstream>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn new(config: BaseConfig) -> Self {
|
||||
pub fn new(config: ParsedConfig) -> Self {
|
||||
let mut new_server = Server {
|
||||
proxies: Vec::new(),
|
||||
config: config.clone(),
|
||||
@@ -53,6 +53,7 @@ impl Server {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let proxy = Proxy {
|
||||
name: name.clone(),
|
||||
listen: listen_addr,
|
||||
@@ -82,11 +83,29 @@ impl Server {
|
||||
let handle = tokio::spawn(async move {
|
||||
match config.protocol.as_ref() {
|
||||
"tcp" => {
|
||||
let _ = tcp::proxy(config).await;
|
||||
let res = tcp::proxy(config.clone()).await;
|
||||
if res.is_err() {
|
||||
error!("Failed to start {}: {}", config.name, res.err().unwrap());
|
||||
}
|
||||
"kcp" => {
|
||||
let _ = kcp::proxy(config).await;
|
||||
}
|
||||
"tcp4" => {
|
||||
let res = tcp::proxy(config.clone()).await;
|
||||
if res.is_err() {
|
||||
error!("Failed to start {}: {}", config.name, res.err().unwrap());
|
||||
}
|
||||
}
|
||||
"tcp6" => {
|
||||
let res = tcp::proxy(config.clone()).await;
|
||||
if res.is_err() {
|
||||
error!("Failed to start {}: {}", config.name, res.err().unwrap());
|
||||
}
|
||||
}
|
||||
// "kcp" => {
|
||||
// let res = kcp::proxy(config.clone()).await;
|
||||
// if res.is_err() {
|
||||
// error!("Failed to start {}: {}", config.name, res.err().unwrap());
|
||||
// }
|
||||
// }
|
||||
_ => {
|
||||
error!("Invalid protocol: {}", config.protocol)
|
||||
}
|
||||
@@ -103,13 +122,12 @@ impl Server {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::plugins::kcp::{KcpConfig, KcpStream};
|
||||
use std::net::SocketAddr;
|
||||
mod tests {
|
||||
//use crate::plugins::kcp::{KcpConfig, KcpStream};
|
||||
use std::thread::{self, sleep};
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -117,16 +135,24 @@ mod test {
|
||||
async fn tcp_mock_server() {
|
||||
let server_addr: SocketAddr = "127.0.0.1:54599".parse().unwrap();
|
||||
let listener = TcpListener::bind(server_addr).await.unwrap();
|
||||
loop {
|
||||
let (mut stream, _) = listener.accept().await.unwrap();
|
||||
let mut buf = [0u8; 1024];
|
||||
let n = stream.read(&mut buf).await.unwrap();
|
||||
if n > 0 {
|
||||
let mut buf = [0u8; 2];
|
||||
let mut n = stream.read(&mut buf).await.unwrap();
|
||||
while n > 0 {
|
||||
stream.write(b"hello").await.unwrap();
|
||||
if buf.eq(b"by") {
|
||||
stream.shutdown().await.unwrap();
|
||||
break;
|
||||
}
|
||||
n = stream.read(&mut buf).await.unwrap();
|
||||
}
|
||||
stream.shutdown().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tcp_proxy() {
|
||||
async fn test_proxy() {
|
||||
use crate::config::Config;
|
||||
let config = Config::new("tests/config.yaml").unwrap();
|
||||
let mut server = Server::new(config.base);
|
||||
@@ -139,48 +165,20 @@ mod test {
|
||||
});
|
||||
sleep(Duration::from_secs(1)); // wait for server to start
|
||||
|
||||
// // test proxy
|
||||
// let mut conn = TcpStream::connect("127.0.0.1:54500").await.unwrap();
|
||||
// let mut buf = [0u8; 5];
|
||||
// conn.write(b"hi").await.unwrap();
|
||||
// conn.read(&mut buf).await.unwrap();
|
||||
// assert_eq!(&buf, b"hello");
|
||||
// conn.shutdown().await.unwrap();
|
||||
|
||||
// test echo
|
||||
let mut conn = TcpStream::connect("127.0.0.1:54956").await.unwrap();
|
||||
let mut buf = [0u8; 1];
|
||||
for i in 0..=10u8 {
|
||||
conn.write(&[i]).await.unwrap();
|
||||
conn.read(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, &[i]);
|
||||
}
|
||||
conn.shutdown().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_kcp_proxy() {
|
||||
use crate::config::Config;
|
||||
let config = Config::new("tests/config.yaml").unwrap();
|
||||
let mut server = Server::new(config.base);
|
||||
thread::spawn(move || {
|
||||
let _ = server.run();
|
||||
});
|
||||
sleep(Duration::from_secs(1)); // wait for server to start
|
||||
|
||||
// test proxy
|
||||
let kcp_config = KcpConfig::default();
|
||||
let server_addr: SocketAddr = "127.0.0.1:54958".parse().unwrap();
|
||||
let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
|
||||
// test TCP proxy
|
||||
let mut conn = tokio::net::TcpStream::connect("127.0.0.1:54500")
|
||||
.await
|
||||
.unwrap();
|
||||
let mut buf = [0u8; 5];
|
||||
conn.write(b"hi").await.unwrap();
|
||||
conn.read(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, b"hello");
|
||||
conn.shutdown().await.unwrap();
|
||||
|
||||
// test echo
|
||||
let kcp_config = KcpConfig::default();
|
||||
let server_addr: SocketAddr = "127.0.0.1:54959".parse().unwrap();
|
||||
let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
|
||||
// test TCP echo
|
||||
let mut conn = tokio::net::TcpStream::connect("127.0.0.1:54956")
|
||||
.await
|
||||
.unwrap();
|
||||
let mut buf = [0u8; 1];
|
||||
for i in 0..=10u8 {
|
||||
conn.write(&[i]).await.unwrap();
|
||||
@@ -188,5 +186,27 @@ mod test {
|
||||
assert_eq!(&buf, &[i]);
|
||||
}
|
||||
conn.shutdown().await.unwrap();
|
||||
|
||||
// test KCP echo
|
||||
// let kcp_config = KcpConfig::default();
|
||||
// let server_addr: SocketAddr = "127.0.0.1:54959".parse().unwrap();
|
||||
// let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
|
||||
// let mut buf = [0u8; 1];
|
||||
// for i in 0..=10u8 {
|
||||
// conn.write(&[i]).await.unwrap();
|
||||
// conn.read(&mut buf).await.unwrap();
|
||||
// assert_eq!(&buf, &[i]);
|
||||
// }
|
||||
// conn.shutdown().await.unwrap();
|
||||
//
|
||||
// // test KCP proxy and close mock server
|
||||
// let kcp_config = KcpConfig::default();
|
||||
// let server_addr: SocketAddr = "127.0.0.1:54958".parse().unwrap();
|
||||
// let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
|
||||
// let mut buf = [0u8; 5];
|
||||
// conn.write(b"by").await.unwrap();
|
||||
// conn.read(&mut buf).await.unwrap();
|
||||
// assert_eq!(&buf, b"hello");
|
||||
// conn.shutdown().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
use crate::config::Upstream;
|
||||
use crate::plugins::kcp::{KcpConfig, KcpListener, KcpStream};
|
||||
use crate::servers::Proxy;
|
||||
use futures::future::try_join;
|
||||
@@ -52,25 +53,30 @@ async fn accept(
|
||||
"No upstream named {:?} on server {:?}",
|
||||
proxy.default, proxy.name
|
||||
);
|
||||
return process(inbound, &proxy.default).await;
|
||||
return process(inbound, proxy.upstream.get(&proxy.default).unwrap()).await;
|
||||
// ToDo: Remove unwrap and check default option
|
||||
}
|
||||
};
|
||||
return process(inbound, upstream).await;
|
||||
}
|
||||
|
||||
async fn process(mut inbound: KcpStream, upstream: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if upstream == "ban" {
|
||||
async fn process(
|
||||
mut inbound: KcpStream,
|
||||
upstream: &Upstream,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
match upstream {
|
||||
Upstream::Ban => {
|
||||
let _ = inbound.shutdown();
|
||||
return Ok(());
|
||||
} else if upstream == "echo" {
|
||||
}
|
||||
Upstream::Echo => {
|
||||
let (mut ri, mut wi) = io::split(inbound);
|
||||
let inbound_to_inbound = copy(&mut ri, &mut wi);
|
||||
let bytes_tx = inbound_to_inbound.await;
|
||||
debug!("Bytes read: {:?}", bytes_tx);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let outbound = TcpStream::connect(upstream).await?;
|
||||
Upstream::Custom(custom) => match custom.protocol.as_ref() {
|
||||
"tcp" => {
|
||||
let outbound = TcpStream::connect(custom.addr.clone()).await?;
|
||||
|
||||
let (mut ri, mut wi) = io::split(inbound);
|
||||
let (mut ro, mut wo) = io::split(outbound);
|
||||
@@ -78,10 +84,16 @@ async fn process(mut inbound: KcpStream, upstream: &str) -> Result<(), Box<dyn s
|
||||
let inbound_to_outbound = copy(&mut ri, &mut wo);
|
||||
let outbound_to_inbound = copy(&mut ro, &mut wi);
|
||||
|
||||
let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?;
|
||||
let (bytes_tx, bytes_rx) =
|
||||
try_join(inbound_to_outbound, outbound_to_inbound).await?;
|
||||
|
||||
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
|
||||
|
||||
}
|
||||
_ => {
|
||||
error!("Reached unknown protocol: {:?}", custom.protocol);
|
||||
}
|
||||
},
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@@ -1,3 +1,3 @@
|
||||
pub mod kcp;
|
||||
//pub mod kcp;
|
||||
pub mod tcp;
|
||||
pub mod tls;
|
||||
|
@@ -1,7 +1,8 @@
|
||||
use crate::config::Upstream;
|
||||
use crate::servers::protocol::tls::get_sni;
|
||||
use crate::servers::Proxy;
|
||||
use futures::future::try_join;
|
||||
use log::{debug, error, warn};
|
||||
use log::{debug, error, info, warn};
|
||||
use std::sync::Arc;
|
||||
use tokio::io;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
@@ -33,7 +34,7 @@ pub async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>>
|
||||
}
|
||||
|
||||
async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
|
||||
debug!("New connection from {:?}", inbound.peer_addr()?);
|
||||
info!("New connection from {:?}", inbound.peer_addr()?);
|
||||
|
||||
let upstream_name = match proxy.tls {
|
||||
false => proxy.default.clone(),
|
||||
@@ -71,25 +72,46 @@ async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std
|
||||
"No upstream named {:?} on server {:?}",
|
||||
proxy.default, proxy.name
|
||||
);
|
||||
return process(inbound, &proxy.default).await;
|
||||
return process(inbound, proxy.upstream.get(&proxy.default).unwrap().clone()).await;
|
||||
// ToDo: Remove unwrap and check default option
|
||||
}
|
||||
};
|
||||
return process(inbound, upstream).await;
|
||||
|
||||
match upstream {
|
||||
Upstream::Custom(u) => u.resolve_addresses().await?,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
async fn process(mut inbound: TcpStream, upstream: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if upstream == "ban" {
|
||||
return process(inbound, upstream.clone()).await;
|
||||
}
|
||||
|
||||
async fn process(
|
||||
mut inbound: TcpStream,
|
||||
upstream: Upstream,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
match upstream {
|
||||
Upstream::Ban => {
|
||||
let _ = inbound.shutdown();
|
||||
return Ok(());
|
||||
} else if upstream == "echo" {
|
||||
}
|
||||
Upstream::Echo => {
|
||||
let (mut ri, mut wi) = io::split(inbound);
|
||||
let inbound_to_inbound = copy(&mut ri, &mut wi);
|
||||
let bytes_tx = inbound_to_inbound.await;
|
||||
debug!("Bytes read: {:?}", bytes_tx);
|
||||
return Ok(());
|
||||
}
|
||||
Upstream::Custom(custom) => {
|
||||
custom.resolve_addresses().await?;
|
||||
let outbound = match custom.protocol.as_ref() {
|
||||
"tcp4" | "tcp6" | "tcp" => {
|
||||
TcpStream::connect(custom.get_addresses().await.as_slice()).await?
|
||||
}
|
||||
_ => {
|
||||
error!("Reached unknown protocol: {:?}", custom.protocol);
|
||||
return Err("Reached unknown protocol".into());
|
||||
}
|
||||
};
|
||||
|
||||
let outbound = TcpStream::connect(upstream).await?;
|
||||
debug!("Connected to {:?}", outbound.peer_addr().unwrap());
|
||||
|
||||
let (mut ri, mut wi) = io::split(inbound);
|
||||
let (mut ro, mut wo) = io::split(outbound);
|
||||
@@ -100,7 +122,8 @@ async fn process(mut inbound: TcpStream, upstream: &str) -> Result<(), Box<dyn s
|
||||
let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?;
|
||||
|
||||
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
|
||||
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@@ -49,6 +49,7 @@ pub fn get_sni(buf: &[u8]) -> Vec<String> {
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Found SNIs: {:?}", &snis);
|
||||
snis
|
||||
}
|
||||
|
||||
|
@@ -31,6 +31,6 @@ servers:
|
||||
default: echo
|
||||
|
||||
upstream:
|
||||
web: "127.0.0.1:8080"
|
||||
proxy: "www.example.com:1024"
|
||||
tester: "127.0.0.1:54599"
|
||||
web: "tcp://127.0.0.1:8080"
|
||||
proxy: "tcp://www.example.com:1024"
|
||||
tester: "tcp://127.0.0.1:54599"
|
Reference in New Issue
Block a user