Compare commits

..

22 Commits

Author SHA1 Message Date
4592c94586 Reintroduce L4P_CONFIG environment variable
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
This points to a user-configured configuration file.

Closes #5.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 23:53:47 +01:00
6284870059 Rename config::config to config::config_v1
To prevent module inception, which was a clippy warning.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 23:34:19 +01:00
97b4bf6bbe Solve synchronization issue
The async mutex in the previous variant would fail when used in a single
threaded mode, because block_in_place() cannot be used there.

Instead, replace the code with a Arc<RwLock> inside of the
UpstreamAddress to let that class take care of its own mutability.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 23:31:23 +01:00
59c7128f93 Remove kcp support
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 22:49:43 +01:00
9d9f89881d Improve config file handling
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 22:03:25 +01:00
ee67f7883e Rename to l4p, update references and README.md
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-23 22:03:25 +01:00
77bc8364f2 Update dependencies to latest versions
Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-22 21:59:14 +01:00
ec9ab1d2bc Add example systemd unit with security protections
This is just about as secure as this process can get

Signed-off-by: Jacob Kiers <code@kiers.eu>
2024-02-22 21:49:58 +01:00
bb81a32349 Deduplicate some code
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-05 13:34:40 +02:00
17b39dc6bc Prepare for new config version
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-05 13:34:40 +02:00
07fccb6b2a Clippy
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-05 00:26:19 +02:00
3a2367ef28 Moved upstreams to their own dedicated namespace
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-05 00:26:06 +02:00
2116659a14 Sort dependencies
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 23:34:26 +02:00
8404f38182 Move ProxyToUpstream parsing to TryFrom trait
This seems cleaner to me than parsing it externally.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 23:27:42 +02:00
23296c6436 Improve code style
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 23:27:42 +02:00
84f0499ec8 Remove unnecessary manual Default implementations
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:32:36 +02:00
ae594135a1 Update dependencies
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:31:43 +02:00
9564fbed6e Fix clippy warnings
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:14:51 +02:00
a574163aef Rename Upstream::Custom to Upstream::Proxy
And CustomUpstream to ProxyToUpstream.

Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:14:51 +02:00
2651ec1f4a Fix kcp module
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:14:51 +02:00
8dae1126d5 Deduplicate copy method
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 22:14:50 +02:00
da46c5873f Fix typo
Signed-off-by: Jacob Kiers <code@kiers.eu>
2023-10-04 20:48:01 +02:00
28 changed files with 632 additions and 1706 deletions

View File

@ -1,4 +1,4 @@
local executableName = 'fourth';
local executableName = 'l4p';
local build_image = 'img.kie.rs/jjkiers/rust-cross:rust1.71.1-zig';
local archs = [

489
Cargo.lock generated
View File

@ -4,9 +4,9 @@ version = 3
[[package]]
name = "addr2line"
version = "0.20.0"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
@ -19,22 +19,22 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aho-corasick"
version = "1.0.4"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a"
checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0"
dependencies = [
"memchr",
]
[[package]]
name = "atty"
version = "0.2.14"
name = "async-trait"
version = "0.1.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
"proc-macro2",
"quote",
"syn 2.0.50",
]
[[package]]
@ -45,9 +45,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.68"
version = "0.3.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
dependencies = [
"addr2line",
"cc",
@ -72,18 +72,15 @@ checksum = "11aade7a05aa8c3a351cedc44c3fc45806430543382fcc4743a9b757a2a0b4ed"
[[package]]
name = "bytes"
version = "1.4.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
[[package]]
name = "cc"
version = "1.0.82"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01"
dependencies = [
"libc",
]
checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730"
[[package]]
name = "cfg-if"
@ -93,9 +90,12 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "deranged"
version = "0.3.7"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929"
checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4"
dependencies = [
"powerfmt",
]
[[package]]
name = "enum_primitive"
@ -108,12 +108,12 @@ dependencies = [
[[package]]
name = "env_logger"
version = "0.7.1"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580"
dependencies = [
"atty",
"humantime",
"is-terminal",
"log",
"regex",
"termcolor",
@ -127,35 +127,18 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "form_urlencoded"
version = "1.2.0"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
dependencies = [
"percent-encoding",
]
[[package]]
name = "fourth"
version = "0.1.7"
dependencies = [
"byte_string",
"bytes",
"futures",
"log",
"pretty_env_logger",
"serde",
"serde_yaml",
"time",
"tls-parser",
"tokio",
"url",
]
[[package]]
name = "futures"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
@ -168,9 +151,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
@ -178,15 +161,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
@ -195,38 +178,38 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.28",
"syn 2.0.50",
]
[[package]]
name = "futures-sink"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
@ -242,9 +225,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.10"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
dependencies = [
"cfg-if",
"libc",
@ -253,45 +236,33 @@ dependencies = [
[[package]]
name = "gimli"
version = "0.27.3"
version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "hashbrown"
version = "0.14.0"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
[[package]]
name = "hermit-abi"
version = "0.1.19"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd"
[[package]]
name = "humantime"
version = "1.3.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "idna"
version = "0.4.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
@ -299,31 +270,60 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.0.0"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177"
dependencies = [
"equivalent",
"hashbrown",
]
[[package]]
name = "itoa"
version = "1.0.9"
name = "is-terminal"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "itoa"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
[[package]]
name = "layer4-proxy"
version = "0.1.7"
dependencies = [
"async-trait",
"byte_string",
"bytes",
"futures",
"log",
"pretty_env_logger",
"serde",
"serde_yaml",
"time",
"tls-parser",
"tokio",
"url",
]
[[package]]
name = "libc"
version = "0.2.147"
version = "0.2.153"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
[[package]]
name = "lock_api"
version = "0.4.10"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16"
checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
dependencies = [
"autocfg",
"scopeguard",
@ -337,9 +337,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "memchr"
version = "2.5.0"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149"
[[package]]
name = "minimal-lexical"
@ -349,22 +349,22 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.8"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
dependencies = [
"libc",
"wasi",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -399,20 +399,26 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "num-conv"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-traits"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31"
dependencies = [
"num-traits 0.2.16",
"num-traits 0.2.18",
]
[[package]]
name = "num-traits"
version = "0.2.16"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a"
dependencies = [
"autocfg",
]
@ -423,24 +429,24 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi 0.3.2",
"hermit-abi",
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9"
dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.31.1"
version = "0.32.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1"
checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441"
dependencies = [
"memchr",
]
@ -457,22 +463,22 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.8"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets",
"windows-targets 0.48.5",
]
[[package]]
name = "percent-encoding"
version = "2.3.0"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "phf"
@ -514,9 +520,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.12"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
@ -524,6 +530,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -532,9 +544,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pretty_env_logger"
version = "0.4.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d"
checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c"
dependencies = [
"env_logger",
"log",
@ -542,24 +554,18 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.66"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.32"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965"
checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef"
dependencies = [
"proc-macro2",
]
@ -596,18 +602,18 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.3.5"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.9.3"
version = "1.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a"
checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
dependencies = [
"aho-corasick",
"memchr",
@ -617,9 +623,9 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.3.6"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69"
checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd"
dependencies = [
"aho-corasick",
"memchr",
@ -628,9 +634,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.7.4"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2"
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "rustc-demangle"
@ -655,9 +661,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
[[package]]
name = "ryu"
version = "1.0.15"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]]
name = "scopeguard"
@ -667,29 +673,29 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.183"
version = "1.0.197"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.183"
version = "1.0.197"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.28",
"syn 2.0.50",
]
[[package]]
name = "serde_yaml"
version = "0.9.25"
version = "0.9.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a49e178e4452f45cb61d0cd8cebc1b0fafd3e41929e996cef79aa3aca91f574"
checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f"
dependencies = [
"indexmap",
"itoa",
@ -709,33 +715,33 @@ dependencies = [
[[package]]
name = "siphasher"
version = "0.3.10"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
[[package]]
name = "slab"
version = "0.4.8"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.11.0"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
[[package]]
name = "socket2"
version = "0.5.3"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -751,9 +757,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.28"
version = "2.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567"
checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb"
dependencies = [
"proc-macro2",
"quote",
@ -762,23 +768,25 @@ dependencies = [
[[package]]
name = "termcolor"
version = "1.2.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]]
name = "time"
version = "0.3.25"
version = "0.3.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea"
checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749"
dependencies = [
"deranged",
"itoa",
"libc",
"num-conv",
"num_threads",
"powerfmt",
"serde",
"time-core",
"time-macros",
@ -786,16 +794,17 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3"
[[package]]
name = "time-macros"
version = "0.2.11"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb71511c991639bb078fd5bf97757e03914361c48100d52878b8e52b46fb92cd"
checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774"
dependencies = [
"num-conv",
"time-core",
]
@ -830,9 +839,9 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.31.0"
version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920"
checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
dependencies = [
"backtrace",
"bytes",
@ -844,52 +853,52 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.28",
"syn 2.0.50",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75"
[[package]]
name = "unicode-ident"
version = "1.0.11"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5"
dependencies = [
"tinyvec",
]
[[package]]
name = "unsafe-libyaml"
version = "0.2.9"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa"
checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b"
[[package]]
name = "url"
version = "2.4.0"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb"
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
dependencies = [
"form_urlencoded",
"idna",
@ -920,9 +929,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
dependencies = [
"winapi",
]
@ -939,62 +948,128 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets",
"windows-targets 0.48.5",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.3",
]
[[package]]
name = "windows-targets"
version = "0.48.2"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1eeca1c172a285ee6c2c84c341ccea837e7c01b12fbb2d0fe3c9e550ce49ec8"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
"windows_aarch64_gnullvm 0.48.5",
"windows_aarch64_msvc 0.48.5",
"windows_i686_gnu 0.48.5",
"windows_i686_msvc 0.48.5",
"windows_x86_64_gnu 0.48.5",
"windows_x86_64_gnullvm 0.48.5",
"windows_x86_64_msvc 0.48.5",
]
[[package]]
name = "windows-targets"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d380ba1dc7187569a8a9e91ed34b8ccfc33123bbacb8c0aed2d1ad7f3ef2dc5f"
dependencies = [
"windows_aarch64_gnullvm 0.52.3",
"windows_aarch64_msvc 0.52.3",
"windows_i686_gnu 0.52.3",
"windows_i686_msvc 0.52.3",
"windows_x86_64_gnu 0.52.3",
"windows_x86_64_gnullvm 0.52.3",
"windows_x86_64_msvc 0.52.3",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.2"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b10d0c968ba7f6166195e13d593af609ec2e3d24f916f081690695cf5eaffb2f"
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68e5dcfb9413f53afd9c8f86e56a7b4d86d9a2fa26090ea2dc9e40fba56c6ec6"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.2"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "571d8d4e62f26d4932099a9efe89660e8bd5087775a2ab5cdd8b747b811f1058"
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8dab469ebbc45798319e69eebf92308e541ce46760b49b18c6b3fe5e8965b30f"
[[package]]
name = "windows_i686_gnu"
version = "0.48.2"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2229ad223e178db5fbbc8bd8d3835e51e566b8474bfca58d2e6150c48bb723cd"
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a4e9b6a7cac734a8b4138a4e1044eac3404d8326b6c0f939276560687a033fb"
[[package]]
name = "windows_i686_msvc"
version = "0.48.2"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "600956e2d840c194eedfc5d18f8242bc2e17c7775b6684488af3a9fff6fe3287"
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b0ec9c422ca95ff34a78755cfa6ad4a51371da2a5ace67500cf7ca5f232c58"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.2"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea99ff3f8b49fb7a8e0d305e5aec485bd068c2ba691b6e277d29eaeac945868a"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704131571ba93e89d7cd43482277d6632589b18ecf4468f591fbae0a8b101614"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.2"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1a05a1ece9a7a0d5a7ccf30ba2c33e3a61a30e042ffd247567d1de1d94120d"
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42079295511643151e98d61c38c0acc444e52dd42ab456f7ccfd5152e8ecf21c"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.2"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d419259aba16b663966e29e6d7c6ecfa0bb8425818bb96f6f1f3c3eb71a6e7b9"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6"

View File

@ -1,13 +1,13 @@
[package]
name = "fourth"
name = "layer4-proxy"
version = "0.1.7"
edition = "2021"
authors = ["LI Rui <lr_cn@outlook.com>"]
authors = ["Jacob Kiers <code@kiers.eu>"]
license = "Apache-2.0"
description = "Simple and fast layer 4 proxy in Rust"
readme = "README.md"
homepage = "https://github.com/KernelErr/fourth"
repository = "https://github.com/KernelErr/fourth"
homepage = "https://code.kiers.eu/jjkiers/layer4-proxy"
repository = "https://code.kiers.eu/jjkiers/layer4-proxy"
keywords = ["proxy", "network"]
categories = ["web-programming"]
@ -15,16 +15,20 @@ categories = ["web-programming"]
exclude = [".*"]
[[bin]]
name = "l4p"
path = "src/main.rs"
[dependencies]
async-trait = "0.1.73"
byte_string = "1"
bytes = "1.1"
futures = "0.3"
log = "0.4"
pretty_env_logger = "0.4"
pretty_env_logger = "0.5"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9.21"
futures = "0.3"
tls-parser = "0.11"
url = "2.2.2"
time = { version = "0.3.1", features = ["local-offset", "formatting"] }
tls-parser = "0.11"
tokio = { version = "1.0", features = ["full"] }
bytes = "1.1"
byte_string = "1"
url = "2.2.2"

View File

@ -1,80 +0,0 @@
# Fourth
> 这一波在第四层。
[![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml)
[English](/README-EN.md)
**积极开发中0.1版本迭代可能较快**
Fourth是一个Rust实现的Layer 4代理用于监听指定端口TCP/KCP流量并根据规则转发到指定目标目前只支持TCP
## 功能
- 监听指定端口代理到本地或远端指定端口
- 监听指定端口通过TLS ClientHello消息中的SNI进行分流
- 支持KCP入站警告未测试
## 安装方法
为了确保获得您架构下的最佳性能,请考虑自行编译,首选需要确保您拥有[Rust工具链](https://rustup.rs/)。
```bash
$ cd fourth
$ cargo build --release
```
将在`target/release/fourth`生成二进制文件,您也可以使用`cargo install --path . `来安装二进制文件。
或者您也可以使用Cargo直接安装
```bash
$ cargo install fourth
```
或者您也可以直接从Release中下载二进制文件。
## 配置
Fourth使用yaml格式的配置文件默认情况下会读取`/etc/fourth/config.yaml`,您也可以设置自定义路径到环境变量`FOURTH_CONFIG`,如下是一个最小有效配置:
```yaml
version: 1
log: info
servers:
proxy_server:
listen:
- "127.0.0.1:8081"
default: remote
upstream:
remote: "tcp://www.remote.example.com:8082" # proxy to remote address
```
内置两个的upstreamban立即中断连接、echo返回读到的数据。更详细的配置可以参考[示例配置](./example-config.yaml)。
注意:[::]会默认同时绑定IPv4和IPv6。
## 性能测试
在4C2G的服务器上测试
使用Fourth代理到Nginx直连QPS 120000: ~70000req/s (测试命令:`wrk -t200 -c1000 -d120s --latency http://proxy-server:8081 `
使用Fourth代理到本地iperf38Gbps
## io_uring?
尽管经过了很多尝试我们发现目前一些Rust下面的io_uring实现存在问题我们使用的io_uring库实现尽管在吞吐量上可以做到单线程20Gbps相比之下Tokio仅有8Gbps但在QPS上存在性能损失较大的问题。因此在有成熟的io_uring实现之前我们仍然选择epoll。之后我们会持续关注相关进展。
可能以后会为Linux高内核版本的用户提供可选的io_uring加速。
## 感谢
- [tokio_kcp](https://github.com/Matrix-Zhang/tokio_kcp)
## 协议
Fourth以Apache-2.0协议开源。

View File

@ -1,41 +1,39 @@
# Fourth
# l4p
> Hey, now we are on level 4!
[![](https://img.shields.io/crates/v/fourth)](https://crates.io/crates/fourth) [![CI](https://img.shields.io/github/workflow/status/kernelerr/fourth/Rust)](https://github.com/KernelErr/fourth/actions/workflows/rust.yml)
![CI](https://drone-ci.kiers.eu/api/badges/jjkiers/layer4-proxy/status.svg)
**Under heavy development, version 0.1 may update frequently**
Fourth is a layer 4 proxy implemented by Rust to listen on specific ports and transfer TCP/KCP data to remote addresses(only TCP) according to configuration.
`l4p` is a layer 4 proxy implemented by Rust to listen on specific ports and transfer TCP data to remote addresses (only TCP) according to the configuration.
## Features
- Listen on specific port and proxy to local or remote port
- SNI-based rule without terminating TLS connection
- Allow KCP inbound(warning: untested)
- DNS-based backend with periodic resolution
## Installation
To gain best performance on your computer's architecture, please consider build the source code. First, you may need [Rust tool chain](https://rustup.rs/).
```bash
$ cd fourth
$ cd l4p
$ cargo build --release
```
Binary file will be generated at `target/release/fourth`, or you can use `cargo install --path .` to install.
Binary file will be generated at `target/release/l4p`, or you can use `cargo install --path .` to install.
Or you can use Cargo to install Fourth:
Or you can use Cargo to install `l4p`:
```bash
$ cargo install fourth
$ cargo install l4p
```
Or you can download binary file form the Release page.
## Configuration
Fourth will read yaml format configuration file from `/etc/fourth/config.yaml`, and you can set custom path to environment variable `FOURTH_CONFIG`, here is an minimal viable example:
`l4p` will read yaml format configuration file from `/etc/l4p/l4p.yaml`, and you can set custom path to environment variable `L4P_CONFIG`, here is an minimal viable example:
```yaml
version: 1
@ -51,20 +49,16 @@ upstream:
remote: "tcp://www.remote.example.com:8082" # proxy to remote address
```
Built-in two upstreams: ban(terminate connection immediately), echo. For detailed configuration, check [this example](./example-config.yaml).
There are two upstreams built in:
* Ban, which terminates the connection immediately
* Echo, which reflects back with the input
## Performance Benchmark
Tested on 4C2G server:
Use fourth to proxy to Nginx(QPS of direct connection: ~120000): ~70000 req/s (Command: `wrk -t200 -c1000 -d120s --latency http://proxy-server:8081`)
Use fourth to proxy to local iperf3: 8Gbps
For detailed configuration, check [this example](./config.yaml.example).
## Thanks
- [tokio_kcp](https://github.com/Matrix-Zhang/tokio_kcp)
- [`l4p`](https://crates.io/crates/`l4p`), of which this is a heavily modified fork.
## License
Fourth is available under terms of Apache-2.0.
`l4p` is available under terms of Apache-2.0.

View File

@ -2,15 +2,20 @@ version: 1
log: debug
servers:
example_server:
first_server:
listen:
- "0.0.0.0:8443"
- "[::]:8443"
tls: true # Enable TLS features like SNI filtering
sni:
api.example.org: example-api
www.example.org: gh-proxy
www.example.org: proxy
default: ban
second-server:
listen: [ "127.0.0.1:8080" ]
default: echo
upstream:
proxy: "tcp://new-www.example.org:443" # Connect over IPv4 or IPv6 to new-www.example.org:443
example-api: "tcp6://api-v1.example.com:443" # Connect over IPv6 to api-v1.example.com:443

View File

@ -15,11 +15,6 @@ servers:
listen:
- "127.0.0.1:8081"
default: remote
kcp_server:
protocol: kcp # default TCP
listen:
- "127.0.0.1:8082"
default: echo
upstream:
nginx: "tcp://127.0.0.1:8080"

51
l4p.service Normal file
View File

@ -0,0 +1,51 @@
[Unit]
Description=l4p - Layer 4 proxy
After=network-online.target
Wants=network-online.target
[Install]
WantedBy=default.target
[Service]
Type=simple
# Allow read-only access to the config directory
ReadOnlyPaths=/etc/l4p
# Path to the binary
ExecStart=/usr/local/bin/l4p
# Needs CAP_NET_BIND_SERVICE in order to bind to lower ports
# When using ports above 1024, these should be made empty
AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE
# Run as a dynamic user
DynamicUser=yes
# Security
PrivateTmp=yes
PrivateDevices=yes
ProtectSystem=strict
ProtectHome=yes
SystemCallFilter=@basic-io @file-system @network-io @system-service
SystemCallFilter=~@privileged
SystemCallFilter=~@resources
NoNewPrivileges=yes
ProtectProc=invisible
RemoveIPC=yes
RestrictAddressFamilies=AF_INET AF_INET6
RestrictNamespaces=yes
ProtectHostname=yes
ProtectClock=yes
ProtectKernelModules=yes
ProtectKernelLogs=yes
ProtectControlGroups=yes
LockPersonality=yes
MemoryDenyWriteExecute=yes
RestrictRealtime=yes
ProcSubset=pid
UMask=0077
SystemCallArchitectures=native
RestrictSUIDSGID=yes
ProtectKernelTunables=yes

View File

@ -1,20 +1,19 @@
use crate::servers::upstream_address::UpstreamAddress;
use log::{debug, warn};
use crate::upstreams::ProxyToUpstream;
use crate::upstreams::Upstream;
use log::{debug, info, warn};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{Error as IOError, Read};
use std::net::SocketAddr;
use tokio::sync::Mutex;
use url::Url;
#[derive(Debug, Clone)]
pub struct Config {
pub base: ParsedConfig,
pub struct ConfigV1 {
pub base: ParsedConfigV1,
}
#[derive(Debug, Default, Deserialize, Clone)]
pub struct ParsedConfig {
pub struct ParsedConfigV1 {
pub version: i32,
pub log: Option<String>,
pub servers: HashMap<String, ServerConfig>,
@ -37,54 +36,55 @@ pub struct ServerConfig {
pub sni: Option<HashMap<String, String>>,
pub default: Option<String>,
}
impl TryInto<ProxyToUpstream> for &str {
type Error = ConfigError;
#[derive(Debug, Clone, Deserialize)]
pub enum Upstream {
Ban,
Echo,
Custom(CustomUpstream),
}
#[derive(Debug)]
struct Addr(Mutex<UpstreamAddress>);
impl Default for Addr {
fn default() -> Self {
Self(Default::default())
fn try_into(self) -> Result<ProxyToUpstream, Self::Error> {
let upstream_url = match Url::parse(self) {
Ok(url) => url,
Err(_) => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
self
)))
}
}
};
impl Clone for Addr {
fn clone(&self) -> Self {
tokio::task::block_in_place(|| Self(Mutex::new(self.0.blocking_lock().clone())))
let upstream_host = match upstream_url.host_str() {
Some(host) => host,
None => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
self
)))
}
}
};
#[derive(Debug, Clone, Deserialize)]
pub struct CustomUpstream {
pub name: String,
pub addr: String,
pub protocol: String,
#[serde(skip_deserializing)]
addresses: Addr,
}
impl CustomUpstream {
pub async fn resolve_addresses(&self) -> std::io::Result<Vec<SocketAddr>> {
let mut addr = self.addresses.0.lock().await;
addr.resolve((*self.protocol).into()).await
let upstream_port = match upstream_url.port_or_known_default() {
Some(port) => port,
None => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
self
)))
}
}
};
impl Default for CustomUpstream {
fn default() -> Self {
Self {
name: Default::default(),
addr: Default::default(),
protocol: Default::default(),
addresses: Default::default(),
match upstream_url.scheme() {
"tcp" | "tcp4" | "tcp6" => {}
_ => {
return Err(ConfigError::Custom(format!(
"Invalid upstream scheme {}",
self
)))
}
}
Ok(ProxyToUpstream::new(
format!("{}:{}", upstream_host, upstream_port),
upstream_url.scheme().to_string(),
))
}
}
#[derive(Debug)]
@ -94,18 +94,18 @@ pub enum ConfigError {
Custom(String),
}
impl Config {
pub fn new(path: &str) -> Result<Config, ConfigError> {
let base = (load_config(path))?;
impl ConfigV1 {
pub fn new(path: &str) -> Result<ConfigV1, ConfigError> {
let base = load_config(path)?;
Ok(Config { base })
Ok(ConfigV1 { base })
}
}
fn load_config(path: &str) -> Result<ParsedConfig, ConfigError> {
fn load_config(path: &str) -> Result<ParsedConfigV1, ConfigError> {
let mut contents = String::new();
let mut file = (File::open(path))?;
(file.read_to_string(&mut contents))?;
let mut file = File::open(path)?;
file.read_to_string(&mut contents)?;
let base: BaseConfig = serde_yaml::from_str(&contents)?;
@ -119,74 +119,23 @@ fn load_config(path: &str) -> Result<ParsedConfig, ConfigError> {
if !log_level.eq("disable") {
std::env::set_var("FOURTH_LOG", log_level.clone());
pretty_env_logger::init_custom_env("FOURTH_LOG");
debug!("Set log level to {}", log_level);
}
info!("Using config file: {}", &path);
debug!("Set log level to {}", log_level);
debug!("Config version {}", base.version);
let mut parsed_upstream: HashMap<String, Upstream> = HashMap::new();
for (name, upstream) in base.upstream.iter() {
let upstream_url = match Url::parse(upstream) {
Ok(url) => url,
Err(_) => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
upstream
)))
}
};
let upstream_host = match upstream_url.host_str() {
Some(host) => host,
None => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
upstream
)))
}
};
let upsteam_port = match upstream_url.port_or_known_default() {
Some(port) => port,
None => {
return Err(ConfigError::Custom(format!(
"Invalid upstream url {}",
upstream
)))
}
};
match upstream_url.scheme() {
"tcp" | "tcp4" | "tcp6" => {}
_ => {
return Err(ConfigError::Custom(format!(
"Invalid upstream scheme {}",
upstream
)))
}
}
parsed_upstream.insert(
name.to_string(),
Upstream::Custom(CustomUpstream {
name: name.to_string(),
addr: format!("{}:{}", upstream_host, upsteam_port),
protocol: upstream_url.scheme().to_string(),
addresses: Addr(Mutex::new(UpstreamAddress::new(format!(
"{}:{}",
upstream_host, upsteam_port
)))),
..Default::default()
}),
);
}
parsed_upstream.insert("ban".to_string(), Upstream::Ban);
parsed_upstream.insert("echo".to_string(), Upstream::Echo);
let parsed = ParsedConfig {
for (name, upstream) in base.upstream.iter() {
let ups = upstream.as_str().try_into()?;
parsed_upstream.insert(name.to_string(), Upstream::Proxy(ups));
}
let parsed = ParsedConfigV1 {
version: base.version,
log: base.log,
servers: base.servers,
@ -196,7 +145,7 @@ fn load_config(path: &str) -> Result<ParsedConfig, ConfigError> {
verify_config(parsed)
}
fn verify_config(config: ParsedConfig) -> Result<ParsedConfig, ConfigError> {
fn verify_config(config: ParsedConfigV1) -> Result<ParsedConfigV1, ConfigError> {
let mut used_upstreams: HashSet<String> = HashSet::new();
let mut upstream_names: HashSet<String> = HashSet::new();
let mut listen_addresses: HashSet<String> = HashSet::new();
@ -270,10 +219,10 @@ mod tests {
#[test]
fn test_load_config() {
let config = Config::new("tests/config.yaml").unwrap();
let config = ConfigV1::new("tests/config.yaml").unwrap();
assert_eq!(config.base.version, 1);
assert_eq!(config.base.log.unwrap(), "disable");
assert_eq!(config.base.servers.len(), 5);
assert_eq!(config.base.servers.len(), 3);
assert_eq!(config.base.upstream.len(), 3 + 2); // Add ban and echo upstreams
}
}

3
src/config/mod.rs Normal file
View File

@ -0,0 +1,3 @@
mod config_v1;
pub(crate) use config_v1::ConfigV1;
pub(crate) use config_v1::ParsedConfigV1;

View File

@ -1,18 +1,26 @@
mod config;
mod plugins;
mod servers;
mod upstreams;
use crate::config::Config;
use crate::config::ConfigV1;
use crate::servers::Server;
use log::{debug, error};
use std::env;
use std::path::Path;
use log::{debug, error, info};
use std::path::PathBuf;
fn main() {
let config_path = find_config();
let config_path = match find_config() {
Ok(p) => p,
Err(paths) => {
println!("Could not find config file. Tried paths:");
for p in paths {
println!("- {}", p);
}
std::process::exit(1);
}
};
let config = match Config::new(&config_path) {
let config = match ConfigV1::new(&config_path) {
Ok(config) => config,
Err(e) => {
println!("Could not load config: {:?}", e);
@ -21,24 +29,43 @@ fn main() {
};
debug!("{:?}", config);
let mut server = Server::new(config.base);
let mut server = Server::new_from_v1_config(config.base);
debug!("{:?}", server);
let _ = server.run();
error!("Server ended with errors");
}
fn find_config() -> String {
let config_path =
env::var("FOURTH_CONFIG").unwrap_or_else(|_| "/etc/fourth/config.yaml".to_string());
fn find_config() -> Result<String, Vec<String>> {
let possible_locations = ["/etc/l4p", ""];
let possible_names = ["l4p.yaml", "config.yaml"];
if Path::new(&config_path).exists() {
return config_path;
let mut tried_paths = Vec::<String>::new();
let mut possible_paths = Vec::<PathBuf>::new();
if let Ok(env_path) = std::env::var("L4P_CONFIG") {
possible_paths.push(PathBuf::from(env_path));
}
if Path::new("config.yaml").exists() {
return String::from("config.yaml");
possible_paths.append(
&mut possible_locations
.iter()
.flat_map(|&path| {
possible_names
.iter()
.map(move |&file| PathBuf::new().join(path).join(file))
})
.collect::<Vec<PathBuf>>(),
);
for path in possible_paths {
let path_str = path.to_string_lossy().to_string();
if path.exists() {
return Ok(path_str);
}
String::from("")
tried_paths.push(path_str);
}
Err(tried_paths)
}

View File

@ -1,110 +0,0 @@
use std::{io::Write, time::Duration};
use kcp::Kcp;
/// Kcp Delay Config
#[derive(Debug, Clone, Copy)]
pub struct KcpNoDelayConfig {
/// Enable nodelay
pub nodelay: bool,
/// Internal update interval (ms)
pub interval: i32,
/// ACK number to enable fast resend
pub resend: i32,
/// Disable congetion control
pub nc: bool,
}
impl Default for KcpNoDelayConfig {
fn default() -> KcpNoDelayConfig {
KcpNoDelayConfig {
nodelay: false,
interval: 100,
resend: 0,
nc: false,
}
}
}
#[allow(unused)]
impl KcpNoDelayConfig {
/// Get a fastest configuration
///
/// 1. Enable NoDelay
/// 2. Set ticking interval to be 10ms
/// 3. Set fast resend to be 2
/// 4. Disable congestion control
pub fn fastest() -> KcpNoDelayConfig {
KcpNoDelayConfig {
nodelay: true,
interval: 10,
resend: 2,
nc: true,
}
}
/// Get a normal configuration
///
/// 1. Disable NoDelay
/// 2. Set ticking interval to be 40ms
/// 3. Disable fast resend
/// 4. Enable congestion control
pub fn normal() -> KcpNoDelayConfig {
KcpNoDelayConfig {
nodelay: false,
interval: 40,
resend: 0,
nc: false,
}
}
}
/// Kcp Config
#[derive(Debug, Clone, Copy)]
pub struct KcpConfig {
/// Max Transmission Unit
pub mtu: usize,
/// nodelay
pub nodelay: KcpNoDelayConfig,
/// Send window size
pub wnd_size: (u16, u16),
/// Session expire duration, default is 90 seconds
pub session_expire: Duration,
/// Flush KCP state immediately after write
pub flush_write: bool,
/// Flush ACKs immediately after input
pub flush_acks_input: bool,
/// Stream mode
pub stream: bool,
}
impl Default for KcpConfig {
fn default() -> KcpConfig {
KcpConfig {
mtu: 1400,
nodelay: KcpNoDelayConfig::normal(),
wnd_size: (256, 256),
session_expire: Duration::from_secs(90),
flush_write: false,
flush_acks_input: false,
stream: true,
}
}
}
impl KcpConfig {
/// Applies config onto `Kcp`
#[doc(hidden)]
pub fn apply_config<W: Write>(&self, k: &mut Kcp<W>) {
k.set_mtu(self.mtu).expect("invalid MTU");
k.set_nodelay(
self.nodelay.nodelay,
self.nodelay.interval,
self.nodelay.resend,
self.nodelay.nc,
);
k.set_wndsize(self.wnd_size.0, self.wnd_size.1);
}
}

View File

@ -1,128 +0,0 @@
use std::{
io::{self, ErrorKind},
net::SocketAddr,
sync::Arc,
time::Duration,
};
use byte_string::ByteStr;
use kcp::{Error as KcpError, KcpResult};
use log::{debug, error, trace};
use tokio::{
net::{ToSocketAddrs, UdpSocket},
sync::mpsc,
task::JoinHandle,
time,
};
use crate::plugins::kcp::{config::KcpConfig, session::KcpSessionManager, stream::KcpStream};
#[allow(unused)]
pub struct KcpListener {
udp: Arc<UdpSocket>,
accept_rx: mpsc::Receiver<(KcpStream, SocketAddr)>,
task_watcher: JoinHandle<()>,
}
impl Drop for KcpListener {
fn drop(&mut self) {
self.task_watcher.abort();
}
}
impl KcpListener {
pub async fn bind<A: ToSocketAddrs>(config: KcpConfig, addr: A) -> KcpResult<KcpListener> {
let udp = UdpSocket::bind(addr).await?;
let udp = Arc::new(udp);
let server_udp = udp.clone();
let (accept_tx, accept_rx) = mpsc::channel(1024 /* backlogs */);
let task_watcher = tokio::spawn(async move {
let (close_tx, mut close_rx) = mpsc::channel(64);
let mut sessions = KcpSessionManager::new();
let mut packet_buffer = [0u8; 65536];
loop {
tokio::select! {
conv = close_rx.recv() => {
let conv = conv.expect("close_tx closed unexpectly");
sessions.close_conv(conv);
trace!("session conv: {} removed", conv);
}
recv_res = udp.recv_from(&mut packet_buffer) => {
match recv_res {
Err(err) => {
error!("udp.recv_from failed, error: {}", err);
time::sleep(Duration::from_secs(1)).await;
}
Ok((n, peer_addr)) => {
let packet = &mut packet_buffer[..n];
log::trace!("received peer: {}, {:?}", peer_addr, ByteStr::new(packet));
let mut conv = kcp::get_conv(packet);
if conv == 0 {
// Allocate a conv for client.
conv = sessions.alloc_conv();
debug!("allocate {} conv for peer: {}", conv, peer_addr);
kcp::set_conv(packet, conv);
}
let session = match sessions.get_or_create(&config, conv, &udp, peer_addr, &close_tx) {
Ok((s, created)) => {
if created {
// Created a new session, constructed a new accepted client
let stream = KcpStream::with_session(s.clone());
if let Err(..) = accept_tx.try_send((stream, peer_addr)) {
debug!("failed to create accepted stream due to channel failure");
// remove it from session
sessions.close_conv(conv);
continue;
}
}
s
},
Err(err) => {
error!("failed to create session, error: {}, peer: {}, conv: {}", err, peer_addr, conv);
continue;
}
};
// let mut kcp = session.kcp_socket().lock().await;
// if let Err(err) = kcp.input(packet) {
// error!("kcp.input failed, peer: {}, conv: {}, error: {}, packet: {:?}", peer_addr, conv, err, ByteStr::new(packet));
// }
session.input(packet).await;
}
}
}
}
}
});
Ok(KcpListener {
udp: server_udp,
accept_rx,
task_watcher,
})
}
pub async fn accept(&mut self) -> KcpResult<(KcpStream, SocketAddr)> {
match self.accept_rx.recv().await {
Some(s) => Ok(s),
None => Err(KcpError::IoError(io::Error::new(
ErrorKind::Other,
"accept channel closed unexpectly",
))),
}
}
#[allow(unused)]
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.udp.local_addr()
}
}

View File

@ -1,14 +0,0 @@
//! Library of KCP on Tokio
pub use self::{
config::{KcpConfig, KcpNoDelayConfig},
listener::KcpListener,
stream::KcpStream,
};
mod config;
mod listener;
mod session;
mod skcp;
mod stream;
mod utils;

View File

@ -1,256 +0,0 @@
use std::{
collections::{hash_map::Entry, HashMap},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use byte_string::ByteStr;
use kcp::KcpResult;
use log::{error, trace};
use tokio::{
net::UdpSocket,
sync::{mpsc, Mutex},
time::{self, Instant},
};
use crate::plugins::kcp::{skcp::KcpSocket, KcpConfig};
pub struct KcpSession {
socket: Mutex<KcpSocket>,
closed: AtomicBool,
session_expire: Duration,
session_close_notifier: Option<mpsc::Sender<u32>>,
input_tx: mpsc::Sender<Vec<u8>>,
}
impl KcpSession {
fn new(
socket: KcpSocket,
session_expire: Duration,
session_close_notifier: Option<mpsc::Sender<u32>>,
input_tx: mpsc::Sender<Vec<u8>>,
) -> KcpSession {
KcpSession {
socket: Mutex::new(socket),
closed: AtomicBool::new(false),
session_expire,
session_close_notifier,
input_tx,
}
}
pub fn new_shared(
socket: KcpSocket,
session_expire: Duration,
session_close_notifier: Option<mpsc::Sender<u32>>,
) -> Arc<KcpSession> {
let is_client = session_close_notifier.is_none();
let (input_tx, mut input_rx) = mpsc::channel(64);
let udp_socket = socket.udp_socket().clone();
let session = Arc::new(KcpSession::new(
socket,
session_expire,
session_close_notifier,
input_tx,
));
{
let session = session.clone();
tokio::spawn(async move {
let mut input_buffer = [0u8; 65536];
let update_timer = time::sleep(Duration::from_millis(10));
tokio::pin!(update_timer);
loop {
tokio::select! {
// recv() then input()
// Drives the KCP machine forward
recv_result = udp_socket.recv(&mut input_buffer), if is_client => {
match recv_result {
Err(err) => {
error!("[SESSION] UDP recv failed, error: {}", err);
}
Ok(n) => {
let input_buffer = &input_buffer[..n];
trace!("[SESSION] UDP recv {} bytes, going to input {:?}", n, ByteStr::new(input_buffer));
let mut socket = session.socket.lock().await;
match socket.input(input_buffer) {
Ok(true) => {
trace!("[SESSION] UDP input {} bytes and waked sender/receiver", n);
}
Ok(false) => {}
Err(err) => {
error!("[SESSION] UDP input {} bytes error: {}, input buffer {:?}", n, err, ByteStr::new(input_buffer));
}
}
}
}
}
// bytes received from listener socket
input_opt = input_rx.recv() => {
if let Some(input_buffer) = input_opt {
let mut socket = session.socket.lock().await;
match socket.input(&input_buffer) {
Ok(..) => {
trace!("[SESSION] UDP input {} bytes from channel {:?}", input_buffer.len(), ByteStr::new(&input_buffer));
}
Err(err) => {
error!("[SESSION] UDP input {} bytes from channel failed, error: {}, input buffer {:?}",
input_buffer.len(), err, ByteStr::new(&input_buffer));
}
}
}
}
// Call update() in period
_ = &mut update_timer => {
let mut socket = session.socket.lock().await;
let is_closed = session.closed.load(Ordering::Acquire);
if is_closed && socket.can_close() {
trace!("[SESSION] KCP session closed");
break;
}
// server socket expires
if !is_client {
// If this is a server stream, close it automatically after a period of time
let last_update_time = socket.last_update_time();
let elapsed = last_update_time.elapsed();
if elapsed > session.session_expire {
if elapsed > session.session_expire * 2 {
// Force close. Client may have already gone.
trace!(
"[SESSION] force close inactive session, conv: {}, last_update: {}s ago",
socket.conv(),
elapsed.as_secs()
);
break;
}
if !is_closed {
trace!(
"[SESSION] closing inactive session, conv: {}, last_update: {}s ago",
socket.conv(),
elapsed.as_secs()
);
session.closed.store(true, Ordering::Release);
}
}
}
match socket.update() {
Ok(next_next) => {
update_timer.as_mut().reset(Instant::from_std(next_next));
}
Err(err) => {
error!("[SESSION] KCP update failed, error: {}", err);
update_timer.as_mut().reset(Instant::now() + Duration::from_millis(10));
}
}
}
}
}
{
// Close the socket.
// Wake all pending tasks and let all send/recv return EOF
let mut socket = session.socket.lock().await;
socket.close();
}
if let Some(ref notifier) = session.session_close_notifier {
let socket = session.socket.lock().await;
let _ = notifier.send(socket.conv()).await;
}
});
}
session
}
pub fn kcp_socket(&self) -> &Mutex<KcpSocket> {
&self.socket
}
pub fn close(&self) {
self.closed.store(true, Ordering::Release);
}
pub async fn input(&self, buf: &[u8]) {
self.input_tx
.send(buf.to_owned())
.await
.expect("input channel closed")
}
}
pub struct KcpSessionManager {
sessions: HashMap<u32, Arc<KcpSession>>,
next_free_conv: u32,
}
impl KcpSessionManager {
pub fn new() -> KcpSessionManager {
KcpSessionManager {
sessions: HashMap::new(),
next_free_conv: 0,
}
}
pub fn close_conv(&mut self, conv: u32) {
self.sessions.remove(&conv);
}
pub fn alloc_conv(&mut self) -> u32 {
loop {
let (mut c, _) = self.next_free_conv.overflowing_add(1);
if c == 0 {
let (nc, _) = c.overflowing_add(1);
c = nc;
}
self.next_free_conv = c;
if self.sessions.get(&self.next_free_conv).is_none() {
let conv = self.next_free_conv;
return conv;
}
}
}
pub fn get_or_create(
&mut self,
config: &KcpConfig,
conv: u32,
udp: &Arc<UdpSocket>,
peer_addr: SocketAddr,
session_close_notifier: &mpsc::Sender<u32>,
) -> KcpResult<(Arc<KcpSession>, bool)> {
match self.sessions.entry(conv) {
Entry::Occupied(occ) => Ok((occ.get().clone(), false)),
Entry::Vacant(vac) => {
let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?;
let session = KcpSession::new_shared(
socket,
config.session_expire,
Some(session_close_notifier.clone()),
);
trace!("created session for conv: {}, peer: {}", conv, peer_addr);
vac.insert(session.clone());
Ok((session, true))
}
}
}
}

View File

@ -1,288 +0,0 @@
use std::{
io::{self, ErrorKind, Write},
net::SocketAddr,
sync::Arc,
task::{Context, Poll, Waker},
time::{Duration, Instant},
};
use futures::future;
use kcp::{Error as KcpError, Kcp, KcpResult};
use log::{error, trace};
use tokio::{net::UdpSocket, sync::mpsc};
use crate::plugins::kcp::{utils::now_millis, KcpConfig};
/// Writer for sending packets to the underlying UdpSocket
struct UdpOutput {
socket: Arc<UdpSocket>,
target_addr: SocketAddr,
delay_tx: mpsc::UnboundedSender<Vec<u8>>,
}
impl UdpOutput {
/// Create a new Writer for writing packets to UdpSocket
pub fn new(socket: Arc<UdpSocket>, target_addr: SocketAddr) -> UdpOutput {
let (delay_tx, mut delay_rx) = mpsc::unbounded_channel::<Vec<u8>>();
{
let socket = socket.clone();
tokio::spawn(async move {
while let Some(buf) = delay_rx.recv().await {
if let Err(err) = socket.send_to(&buf, target_addr).await {
error!("[SEND] UDP delayed send failed, error: {}", err);
}
}
});
}
UdpOutput {
socket,
target_addr,
delay_tx,
}
}
}
impl Write for UdpOutput {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.socket.try_send_to(buf, self.target_addr) {
Ok(n) => Ok(n),
Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
// send return EAGAIN
// ignored as packet was lost in transmission
trace!(
"[SEND] UDP send EAGAIN, packet.size: {} bytes, delayed send",
buf.len()
);
self.delay_tx
.send(buf.to_owned())
.expect("channel closed unexpectly");
Ok(buf.len())
}
Err(err) => Err(err),
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub struct KcpSocket {
kcp: Kcp<UdpOutput>,
last_update: Instant,
socket: Arc<UdpSocket>,
flush_write: bool,
flush_ack_input: bool,
sent_first: bool,
pending_sender: Option<Waker>,
pending_receiver: Option<Waker>,
closed: bool,
}
impl KcpSocket {
pub fn new(
c: &KcpConfig,
conv: u32,
socket: Arc<UdpSocket>,
target_addr: SocketAddr,
stream: bool,
) -> KcpResult<KcpSocket> {
let output = UdpOutput::new(socket.clone(), target_addr);
let mut kcp = if stream {
Kcp::new_stream(conv, output)
} else {
Kcp::new(conv, output)
};
c.apply_config(&mut kcp);
// Ask server to allocate one
if conv == 0 {
kcp.input_conv();
}
kcp.update(now_millis())?;
Ok(KcpSocket {
kcp,
last_update: Instant::now(),
socket,
flush_write: c.flush_write,
flush_ack_input: c.flush_acks_input,
sent_first: false,
pending_sender: None,
pending_receiver: None,
closed: false,
})
}
/// Call every time you got data from transmission
pub fn input(&mut self, buf: &[u8]) -> KcpResult<bool> {
match self.kcp.input(buf) {
Ok(..) => {}
Err(KcpError::ConvInconsistent(expected, actual)) => {
trace!(
"[INPUT] Conv expected={} actual={} ignored",
expected,
actual
);
return Ok(false);
}
Err(err) => return Err(err),
}
self.last_update = Instant::now();
if self.flush_ack_input {
self.kcp.flush_ack()?;
}
Ok(self.try_wake_pending_waker())
}
/// Call if you want to send some data
pub fn poll_send(&mut self, cx: &mut Context<'_>, mut buf: &[u8]) -> Poll<KcpResult<usize>> {
if self.closed {
return Ok(0).into();
}
// If:
// 1. Have sent the first packet (asking for conv)
// 2. Too many pending packets
if self.sent_first
&& (self.kcp.wait_snd() >= self.kcp.snd_wnd() as usize || self.kcp.waiting_conv())
{
trace!(
"[SEND] waitsnd={} sndwnd={} excceeded or waiting conv={}",
self.kcp.wait_snd(),
self.kcp.snd_wnd(),
self.kcp.waiting_conv()
);
self.pending_sender = Some(cx.waker().clone());
return Poll::Pending;
}
if !self.sent_first && self.kcp.waiting_conv() && buf.len() > self.kcp.mss() as usize {
buf = &buf[..self.kcp.mss() as usize];
}
let n = self.kcp.send(buf)?;
self.sent_first = true;
self.last_update = Instant::now();
if self.flush_write {
self.kcp.flush()?;
}
Ok(n).into()
}
/// Call if you want to send some data
#[allow(dead_code)]
pub async fn send(&mut self, buf: &[u8]) -> KcpResult<usize> {
future::poll_fn(|cx| self.poll_send(cx, buf)).await
}
#[allow(dead_code)]
pub fn try_recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
if self.closed {
return Ok(0);
}
self.kcp.recv(buf)
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<KcpResult<usize>> {
if self.closed {
return Ok(0).into();
}
match self.kcp.recv(buf) {
Ok(n) => Ok(n).into(),
Err(KcpError::RecvQueueEmpty) => {
self.pending_receiver = Some(cx.waker().clone());
Poll::Pending
}
Err(err) => Err(err).into(),
}
}
#[allow(dead_code)]
pub async fn recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
future::poll_fn(|cx| self.poll_recv(cx, buf)).await
}
pub fn flush(&mut self) -> KcpResult<()> {
self.kcp.flush()?;
self.last_update = Instant::now();
Ok(())
}
fn try_wake_pending_waker(&mut self) -> bool {
let mut waked = false;
if self.pending_sender.is_some()
&& self.kcp.wait_snd() < self.kcp.snd_wnd() as usize
&& !self.kcp.waiting_conv()
{
let waker = self.pending_sender.take().unwrap();
waker.wake();
waked = true;
}
if self.pending_receiver.is_some() {
if let Ok(peek) = self.kcp.peeksize() {
if peek > 0 {
let waker = self.pending_receiver.take().unwrap();
waker.wake();
waked = true;
}
}
}
waked
}
pub fn update(&mut self) -> KcpResult<Instant> {
let now = now_millis();
self.kcp.update(now)?;
let next = self.kcp.check(now);
self.try_wake_pending_waker();
Ok(Instant::now() + Duration::from_millis(next as u64))
}
pub fn close(&mut self) {
self.closed = true;
if let Some(w) = self.pending_sender.take() {
w.wake();
}
if let Some(w) = self.pending_receiver.take() {
w.wake();
}
}
pub fn udp_socket(&self) -> &Arc<UdpSocket> {
&self.socket
}
pub fn can_close(&self) -> bool {
self.kcp.wait_snd() == 0
}
pub fn conv(&self) -> u32 {
self.kcp.conv()
}
pub fn peek_size(&self) -> KcpResult<usize> {
self.kcp.peeksize()
}
pub fn last_update_time(&self) -> Instant {
self.last_update
}
}

View File

@ -1,183 +0,0 @@
use std::{
io::{self, ErrorKind},
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::{future, ready};
use kcp::{Error as KcpError, KcpResult};
use log::trace;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UdpSocket,
};
use crate::plugins::kcp::{config::KcpConfig, session::KcpSession, skcp::KcpSocket};
pub struct KcpStream {
session: Arc<KcpSession>,
recv_buffer: Vec<u8>,
recv_buffer_pos: usize,
recv_buffer_cap: usize,
}
impl Drop for KcpStream {
fn drop(&mut self) {
self.session.close();
}
}
#[allow(unused)]
impl KcpStream {
pub async fn connect(config: &KcpConfig, addr: SocketAddr) -> KcpResult<KcpStream> {
let udp = match addr.ip() {
IpAddr::V4(..) => UdpSocket::bind("0.0.0.0:0").await?,
IpAddr::V6(..) => UdpSocket::bind("[::]:0").await?,
};
let udp = Arc::new(udp);
let socket = KcpSocket::new(config, 0, udp, addr, config.stream)?;
let session = KcpSession::new_shared(socket, config.session_expire, None);
Ok(KcpStream::with_session(session))
}
pub(crate) fn with_session(session: Arc<KcpSession>) -> KcpStream {
KcpStream {
session,
recv_buffer: Vec::new(),
recv_buffer_pos: 0,
recv_buffer_cap: 0,
}
}
pub fn poll_send(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<KcpResult<usize>> {
// Mutex doesn't have poll_lock, spinning on it.
let socket = self.session.kcp_socket();
let mut kcp = match socket.try_lock() {
Ok(guard) => guard,
Err(..) => {
cx.waker().wake_by_ref();
return Poll::Pending;
}
};
kcp.poll_send(cx, buf)
}
pub async fn send(&mut self, buf: &[u8]) -> KcpResult<usize> {
future::poll_fn(|cx| self.poll_send(cx, buf)).await
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<KcpResult<usize>> {
loop {
// Consumes all data in buffer
if self.recv_buffer_pos < self.recv_buffer_cap {
let remaining = self.recv_buffer_cap - self.recv_buffer_pos;
let copy_length = remaining.min(buf.len());
buf.copy_from_slice(
&self.recv_buffer[self.recv_buffer_pos..self.recv_buffer_pos + copy_length],
);
self.recv_buffer_pos += copy_length;
return Ok(copy_length).into();
}
// Mutex doesn't have poll_lock, spinning on it.
let socket = self.session.kcp_socket();
let mut kcp = match socket.try_lock() {
Ok(guard) => guard,
Err(..) => {
cx.waker().wake_by_ref();
return Poll::Pending;
}
};
// Try to read from KCP
// 1. Read directly with user provided `buf`
match ready!(kcp.poll_recv(cx, buf)) {
Ok(n) => {
trace!("[CLIENT] recv directly {} bytes", n);
return Ok(n).into();
}
Err(KcpError::UserBufTooSmall) => {}
Err(err) => return Err(err).into(),
}
// 2. User `buf` too small, read to recv_buffer
let required_size = kcp.peek_size()?;
if self.recv_buffer.len() < required_size {
self.recv_buffer.resize(required_size, 0);
}
match ready!(kcp.poll_recv(cx, &mut self.recv_buffer)) {
Ok(n) => {
trace!("[CLIENT] recv buffered {} bytes", n);
self.recv_buffer_pos = 0;
self.recv_buffer_cap = n;
}
Err(err) => return Err(err).into(),
}
}
}
pub async fn recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
future::poll_fn(|cx| self.poll_recv(cx, buf)).await
}
}
impl AsyncRead for KcpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match ready!(self.poll_recv(cx, buf.initialize_unfilled())) {
Ok(n) => {
buf.advance(n);
Ok(()).into()
}
Err(KcpError::IoError(err)) => Err(err).into(),
Err(err) => Err(io::Error::new(ErrorKind::Other, err)).into(),
}
}
}
impl AsyncWrite for KcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match ready!(self.poll_send(cx, buf)) {
Ok(n) => Ok(n).into(),
Err(KcpError::IoError(err)) => Err(err).into(),
Err(err) => Err(io::Error::new(ErrorKind::Other, err)).into(),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// Mutex doesn't have poll_lock, spinning on it.
let socket = self.session.kcp_socket();
let mut kcp = match socket.try_lock() {
Ok(guard) => guard,
Err(..) => {
cx.waker().wake_by_ref();
return Poll::Pending;
}
};
match kcp.flush() {
Ok(..) => Ok(()).into(),
Err(KcpError::IoError(err)) => Err(err).into(),
Err(err) => Err(io::Error::new(ErrorKind::Other, err)).into(),
}
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Ok(()).into()
}
}

View File

@ -1,10 +0,0 @@
use std::time::{SystemTime, UNIX_EPOCH};
#[inline]
pub fn now_millis() -> u32 {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("time went afterwards");
(since_the_epoch.as_secs() * 1000 + since_the_epoch.subsec_millis() as u64 / 1_000_000) as u32
}

View File

@ -1 +0,0 @@
//pub mod kcp;

View File

@ -2,12 +2,14 @@ use log::{error, info};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::task::JoinHandle;
mod protocol;
pub(crate) mod upstream_address;
use crate::config::{ParsedConfig, Upstream};
use crate::config::ParsedConfigV1;
use crate::upstreams::Upstream;
use protocol::tcp;
#[derive(Debug)]
@ -27,7 +29,7 @@ pub(crate) struct Proxy {
}
impl Server {
pub fn new(config: ParsedConfig) -> Self {
pub fn new_from_v1_config(config: ParsedConfigV1) -> Self {
let mut new_server = Server {
proxies: Vec::new(),
};
@ -82,30 +84,12 @@ impl Server {
);
let handle = tokio::spawn(async move {
match config.protocol.as_ref() {
"tcp" => {
"tcp" | "tcp4" | "tcp6" => {
let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
}
}
"tcp4" => {
let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
}
}
"tcp6" => {
let res = tcp::proxy(config.clone()).await;
if res.is_err() {
error!("Failed to start {}: {}", config.name, res.err().unwrap());
}
}
// "kcp" => {
// let res = kcp::proxy(config.clone()).await;
// if res.is_err() {
// error!("Failed to start {}: {}", config.name, res.err().unwrap());
// }
// }
_ => {
error!("Invalid protocol: {}", config.protocol)
}
@ -123,7 +107,6 @@ impl Server {
#[cfg(test)]
mod tests {
//use crate::plugins::kcp::{KcpConfig, KcpStream};
use std::thread::{self, sleep};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -140,7 +123,7 @@ mod tests {
let mut buf = [0u8; 2];
let mut n = stream.read(&mut buf).await.unwrap();
while n > 0 {
stream.write(b"hello").await.unwrap();
let _ = stream.write(b"hello").await.unwrap();
if buf.eq(b"by") {
stream.shutdown().await.unwrap();
break;
@ -153,9 +136,9 @@ mod tests {
#[tokio::test]
async fn test_proxy() {
use crate::config::Config;
let config = Config::new("tests/config.yaml").unwrap();
let mut server = Server::new(config.base);
use crate::config::ConfigV1;
let config = ConfigV1::new("tests/config.yaml").unwrap();
let mut server = Server::new_from_v1_config(config.base);
thread::spawn(move || {
tcp_mock_server();
});
@ -170,8 +153,8 @@ mod tests {
.await
.unwrap();
let mut buf = [0u8; 5];
conn.write(b"hi").await.unwrap();
conn.read(&mut buf).await.unwrap();
let _ = conn.write(b"hi").await.unwrap();
let _ = conn.read(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
conn.shutdown().await.unwrap();
@ -181,32 +164,10 @@ mod tests {
.unwrap();
let mut buf = [0u8; 1];
for i in 0..=10u8 {
conn.write(&[i]).await.unwrap();
conn.read(&mut buf).await.unwrap();
let _ = conn.write(&[i]).await.unwrap();
let _ = conn.read(&mut buf).await.unwrap();
assert_eq!(&buf, &[i]);
}
conn.shutdown().await.unwrap();
// test KCP echo
// let kcp_config = KcpConfig::default();
// let server_addr: SocketAddr = "127.0.0.1:54959".parse().unwrap();
// let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
// let mut buf = [0u8; 1];
// for i in 0..=10u8 {
// conn.write(&[i]).await.unwrap();
// conn.read(&mut buf).await.unwrap();
// assert_eq!(&buf, &[i]);
// }
// conn.shutdown().await.unwrap();
//
// // test KCP proxy and close mock server
// let kcp_config = KcpConfig::default();
// let server_addr: SocketAddr = "127.0.0.1:54958".parse().unwrap();
// let mut conn = KcpStream::connect(&kcp_config, server_addr).await.unwrap();
// let mut buf = [0u8; 5];
// conn.write(b"by").await.unwrap();
// conn.read(&mut buf).await.unwrap();
// assert_eq!(&buf, b"hello");
// conn.shutdown().await.unwrap();
}
}

View File

@ -1,112 +0,0 @@
use crate::config::Upstream;
use crate::plugins::kcp::{KcpConfig, KcpListener, KcpStream};
use crate::servers::Proxy;
use futures::future::try_join;
use log::{debug, error, warn};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
pub async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
let kcp_config = KcpConfig::default();
let mut listener = KcpListener::bind(kcp_config, config.listen).await?;
let config = config.clone();
loop {
let thread_proxy = config.clone();
match listener.accept().await {
Err(err) => {
error!("Failed to accept connection: {}", err);
return Err(Box::new(err));
}
Ok((stream, peer)) => {
tokio::spawn(async move {
match accept(stream, peer, thread_proxy).await {
Ok(_) => {}
Err(err) => {
error!("Relay thread returned an error: {}", err);
}
};
});
}
}
}
}
async fn accept(
inbound: KcpStream,
peer: SocketAddr,
proxy: Arc<Proxy>,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("New connection from {:?}", peer);
let upstream_name = proxy.default.clone();
debug!("Upstream: {}", upstream_name);
let upstream = match proxy.upstream.get(&upstream_name) {
Some(upstream) => upstream,
None => {
warn!(
"No upstream named {:?} on server {:?}",
proxy.default, proxy.name
);
return process(inbound, proxy.upstream.get(&proxy.default).unwrap()).await;
// ToDo: Remove unwrap and check default option
}
};
return process(inbound, upstream).await;
}
async fn process(
mut inbound: KcpStream,
upstream: &Upstream,
) -> Result<(), Box<dyn std::error::Error>> {
match upstream {
Upstream::Ban => {
let _ = inbound.shutdown();
}
Upstream::Echo => {
let (mut ri, mut wi) = io::split(inbound);
let inbound_to_inbound = copy(&mut ri, &mut wi);
let bytes_tx = inbound_to_inbound.await;
debug!("Bytes read: {:?}", bytes_tx);
}
Upstream::Custom(custom) => match custom.protocol.as_ref() {
"tcp" => {
let outbound = TcpStream::connect(custom.addr.clone()).await?;
let (mut ri, mut wi) = io::split(inbound);
let (mut ro, mut wo) = io::split(outbound);
let inbound_to_outbound = copy(&mut ri, &mut wo);
let outbound_to_inbound = copy(&mut ro, &mut wi);
let (bytes_tx, bytes_rx) =
try_join(inbound_to_outbound, outbound_to_inbound).await?;
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
}
_ => {
error!("Reached unknown protocol: {:?}", custom.protocol);
}
},
};
Ok(())
}
async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
where
R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
{
match io::copy(reader, writer).await {
Ok(u64) => {
let _ = writer.shutdown().await;
Ok(u64)
}
Err(_) => Ok(0),
}
}

View File

@ -1,3 +1,2 @@
//pub mod kcp;
pub mod tcp;
pub mod tls;

View File

@ -1,14 +1,11 @@
use crate::config::Upstream;
use crate::servers::protocol::tls::get_sni;
use crate::servers::Proxy;
use futures::future::try_join;
use log::{debug, error, info, warn};
use std::error::Error;
use std::sync::Arc;
use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
pub(crate) async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
pub(crate) async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(config.listen).await?;
let config = config.clone();
@ -33,7 +30,7 @@ pub(crate) async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn std::error::
}
}
async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std::error::Error>> {
async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn Error>> {
info!("New connection from {:?}", inbound.peer_addr()?);
let upstream_name = match proxy.tls {
@ -72,65 +69,9 @@ async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn std
"No upstream named {:?} on server {:?}",
proxy.default_action, proxy.name
);
return process(inbound, proxy.upstream.get(&proxy.default_action).unwrap()).await;
// ToDo: Remove unwrap and check default option
proxy.upstream.get(&proxy.default_action).unwrap()
}
};
return process(inbound, &upstream).await;
}
async fn process(
mut inbound: TcpStream,
upstream: &Upstream,
) -> Result<(), Box<dyn std::error::Error>> {
match upstream {
Upstream::Ban => {
let _ = inbound.shutdown();
}
Upstream::Echo => {
let (mut ri, mut wi) = io::split(inbound);
let inbound_to_inbound = copy(&mut ri, &mut wi);
let bytes_tx = inbound_to_inbound.await;
debug!("Bytes read: {:?}", bytes_tx);
}
Upstream::Custom(custom) => {
let outbound = match custom.protocol.as_ref() {
"tcp4" | "tcp6" | "tcp" => {
TcpStream::connect(custom.resolve_addresses().await?.as_slice()).await?
}
_ => {
error!("Reached unknown protocol: {:?}", custom.protocol);
return Err("Reached unknown protocol".into());
}
};
debug!("Connected to {:?}", outbound.peer_addr().unwrap());
let (mut ri, mut wi) = io::split(inbound);
let (mut ro, mut wo) = io::split(outbound);
let inbound_to_outbound = copy(&mut ri, &mut wo);
let outbound_to_inbound = copy(&mut ro, &mut wi);
let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?;
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
}
};
Ok(())
}
async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
where
R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
{
match io::copy(reader, writer).await {
Ok(u64) => {
let _ = writer.shutdown().await;
Ok(u64)
}
Err(_) => Ok(0),
}
upstream.process(inbound).await
}

View File

@ -99,6 +99,6 @@ mod tests {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
let sni = get_sni(&BUF);
assert!(sni[0] == "www.lirui.tech".to_string());
assert!(sni[0] == *"www.lirui.tech");
}
}

View File

@ -2,14 +2,16 @@ use log::debug;
use std::fmt::{Display, Formatter};
use std::io::Result;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::RwLock;
use time::{Duration, Instant, OffsetDateTime};
#[derive(Debug, Clone, Default)]
pub(crate) struct UpstreamAddress {
address: String,
resolved_addresses: Vec<SocketAddr>,
resolved_time: Option<Instant>,
ttl: Option<Duration>,
resolved_addresses: Arc<RwLock<Vec<SocketAddr>>>,
resolved_time: Arc<RwLock<Option<Instant>>>,
ttl: Arc<RwLock<Option<Duration>>>,
}
impl Display for UpstreamAddress {
@ -27,8 +29,10 @@ impl UpstreamAddress {
}
pub fn is_valid(&self) -> bool {
if let Some(resolved) = self.resolved_time {
if let Some(ttl) = self.ttl {
let r = { *self.resolved_time.read().unwrap() };
if let Some(resolved) = r {
if let Some(ttl) = { *self.ttl.read().unwrap() } {
return resolved.elapsed() < ttl;
}
}
@ -37,7 +41,7 @@ impl UpstreamAddress {
}
fn is_resolved(&self) -> bool {
self.resolved_addresses.len() > 0
!self.resolved_addresses.read().unwrap().is_empty()
}
fn time_remaining(&self) -> Duration {
@ -45,17 +49,19 @@ impl UpstreamAddress {
return Duration::seconds(0);
}
self.ttl.unwrap() - self.resolved_time.unwrap().elapsed()
let rt = { *self.resolved_time.read().unwrap() };
let ttl = { *self.ttl.read().unwrap() };
ttl.unwrap() - rt.unwrap().elapsed()
}
pub async fn resolve(&mut self, mode: ResolutionMode) -> Result<Vec<SocketAddr>> {
pub async fn resolve(&self, mode: ResolutionMode) -> Result<Vec<SocketAddr>> {
if self.is_resolved() && self.is_valid() {
debug!(
"Already got address {:?}, still valid for {:.3}s",
&self.resolved_addresses,
self.time_remaining().as_seconds_f64()
);
return Ok(self.resolved_addresses.clone());
return Ok(self.resolved_addresses.read().unwrap().clone());
}
debug!(
@ -70,8 +76,8 @@ impl UpstreamAddress {
Err(e) => {
debug!("Failed looking up {}: {}", &self.address, &e);
// Protect against DNS flooding. Cache the result for 1 second.
self.resolved_time = Some(Instant::now());
self.ttl = Some(Duration::seconds(3));
*self.resolved_time.write().unwrap() = Some(Instant::now());
*self.ttl.write().unwrap() = Some(Duration::seconds(3));
return Err(e);
}
};
@ -103,11 +109,11 @@ impl UpstreamAddress {
.expect("Format")
);
self.resolved_addresses = addresses;
self.resolved_time = Some(Instant::now());
self.ttl = Some(Duration::minutes(1));
*self.resolved_addresses.write().unwrap() = addresses.clone();
*self.resolved_time.write().unwrap() = Some(Instant::now());
*self.ttl.write().unwrap() = Some(Duration::minutes(1));
Ok(self.resolved_addresses.clone())
Ok(addresses)
}
}

51
src/upstreams/mod.rs Normal file
View File

@ -0,0 +1,51 @@
mod proxy_to_upstream;
use log::debug;
use serde::Deserialize;
use std::error::Error;
use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
pub use crate::upstreams::proxy_to_upstream::ProxyToUpstream;
#[derive(Debug, Clone, Deserialize)]
pub enum Upstream {
Ban,
Echo,
Proxy(ProxyToUpstream),
}
impl Upstream {
pub(crate) async fn process(&self, mut inbound: TcpStream) -> Result<(), Box<dyn Error>> {
match self {
Upstream::Ban => {
inbound.shutdown().await?;
}
Upstream::Echo => {
let (mut ri, mut wi) = io::split(inbound);
let inbound_to_inbound = copy(&mut ri, &mut wi);
let bytes_tx = inbound_to_inbound.await;
debug!("Bytes read: {:?}", bytes_tx);
}
Upstream::Proxy(config) => {
config.proxy(inbound).await?;
}
};
Ok(())
}
}
async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
where
R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
{
match io::copy(reader, writer).await {
Ok(u64) => {
let _ = writer.shutdown().await;
Ok(u64)
}
Err(_) => Ok(0),
}
}

View File

@ -0,0 +1,57 @@
use crate::servers::upstream_address::UpstreamAddress;
use crate::upstreams::copy;
use futures::future::try_join;
use log::{debug, error};
use serde::Deserialize;
use std::net::SocketAddr;
use tokio::io;
use tokio::net::TcpStream;
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ProxyToUpstream {
pub addr: String,
pub protocol: String,
#[serde(skip_deserializing)]
addresses: UpstreamAddress,
}
impl ProxyToUpstream {
pub async fn resolve_addresses(&self) -> std::io::Result<Vec<SocketAddr>> {
self.addresses.resolve((*self.protocol).into()).await
}
pub fn new(address: String, protocol: String) -> Self {
Self {
addr: address.clone(),
protocol,
addresses: UpstreamAddress::new(address),
}
}
pub(crate) async fn proxy(&self, inbound: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let outbound = match self.protocol.as_ref() {
"tcp4" | "tcp6" | "tcp" => {
TcpStream::connect(self.resolve_addresses().await?.as_slice()).await?
}
_ => {
error!("Reached unknown protocol: {:?}", self.protocol);
return Err("Reached unknown protocol".into());
}
};
debug!("Connected to {:?}", outbound.peer_addr().unwrap());
let (mut ri, mut wi) = io::split(inbound);
let (mut ro, mut wo) = io::split(outbound);
let inbound_to_outbound = copy(&mut ri, &mut wo);
let outbound_to_inbound = copy(&mut ro, &mut wi);
let (bytes_tx, bytes_rx) = try_join(inbound_to_outbound, outbound_to_inbound).await?;
debug!("Bytes read: {:?} write: {:?}", bytes_tx, bytes_rx);
Ok(())
}
}

View File

@ -19,16 +19,6 @@ servers:
listen:
- "0.0.0.0:54956"
default: echo
kcp_server:
protocol: kcp
listen:
- "127.0.0.1:54958"
default: tester
kcp_echo_server:
protocol: kcp
listen:
- "127.0.0.1:54959"
default: echo
upstream:
web: "tcp://127.0.0.1:8080"