diff --git a/Cargo.lock b/Cargo.lock index 7cacb3a53bf8052db8707cd533d41d24db8e4271..f855ac84a89720a0ecbac7c86c0ca030adc02b6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,55 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "anstream" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" +dependencies = [ + "anstyle", + "windows-sys 0.59.0", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -44,18 +93,76 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clap" +version = "4.5.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" + +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + [[package]] name = "gimli" version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "libc" version = "0.2.166" @@ -86,7 +193,7 @@ dependencies = [ "hermit-abi", "libc", "wasi", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -154,9 +261,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "2.0.89" @@ -172,6 +285,7 @@ dependencies = [ name = "tcpjunction" version = "0.1.0" dependencies = [ + "clap", "signal-hook", "tokio", ] @@ -189,7 +303,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -209,6 +323,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -224,6 +344,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-targets" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index c18c6f9e5e7412c55b8db2c64436966b1d4781e4..da06d3bd709a99618d3afc5251bef3b23d3fdf9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] tokio = { version = "1.41.1", features = ["rt", "rt-multi-thread", "macros", "net", "io-util", "time"] } signal-hook = "0.3.17" +clap = { version = "4.5.21", features = ["derive"] } diff --git a/src/lib.rs b/src/lib.rs index 00b0cdf9260f7d88b75c0baa8acadaf2887f833a..09ebbd05c681e702b3cb97b969873639fbb7df2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,11 @@ -// use tokio::io; -// use tokio::net::TcpListener; -// use tokio::net::TcpStream; -// use std::sync::{Arc, Mutex}; -// use signal_hook::{consts::SIGUSR1, iterator::Signals}; -// use std::time::Duration; -// use tokio::time::{sleep, timeout}; +use tokio::io; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use std::sync::{Arc, Mutex}; +use signal_hook::{consts::SIGUSR1, iterator::Signals}; +use std::time::Duration; +use tokio::time::{sleep, timeout}; struct UpstreamState { is_up: bool, @@ -36,88 +36,93 @@ impl JunctionState { false } - fn advance(&mut self) { + fn advance(&mut self) -> String { self.preference = (self.preference + 1) % self.upstreams.len(); + self.upstreams[self.preference].address.clone() + } +} + + +type SharedState = Arc<Mutex<JunctionState>>; + +pub async fn launch(bind: String, upstreams: Vec<String>) { + let server = TcpListener::bind(bind).await.unwrap(); + + let n_upstreams = upstreams.len(); + let upstreams = upstreams.iter().map(|addr| UpstreamState { + address: addr.clone(), + is_up: false, + }).collect(); + + let state: SharedState = Arc::new(Mutex::new(JunctionState { + upstreams, + preference: 0, + })); + + + // Launch Interrupt signal catcher + { + let state = state.clone(); + tokio::spawn(async move { + let mut signals = Signals::new([SIGUSR1]).unwrap(); + for _ in signals.forever() { + let mut state = state.lock().unwrap(); + let addr = state.advance(); + println!("USR1 signal received. Preferred upstream: {:?}", addr); + } + }); + } + + // Launch upstream probes + for idx in 0..n_upstreams { + let state = state.clone(); + tokio::spawn(async move { + probe_upstream(idx, state).await + }); + } + + // Listen for incoming connections + loop { + let (socket, _) = server.accept().await.unwrap(); + let state = state.clone(); + tokio::spawn(async move { + handle_client(socket, state).await; + }); + } +} + +async fn handle_client(client: TcpStream, state: SharedState) { + let address = state.lock().unwrap().current(); + let server = TcpStream::connect(address.as_str()).await.unwrap(); + + let (mut eread, mut ewrite) = client.into_split(); + let (mut oread, mut owrite) = server.into_split(); + + tokio::spawn(async move { io::copy(&mut eread, &mut owrite).await }); + tokio::spawn(async move { io::copy(&mut oread, &mut ewrite).await }); +} + +async fn probe_upstream(upstream_idx: usize, state: SharedState) { + loop { + let address = state.lock().unwrap().upstreams[upstream_idx].address.clone(); + let upstream = timeout( + Duration::from_millis(1000), + TcpStream::connect(address.as_str()) + ).await; + let is_up = match upstream { + Ok(Ok(_)) => true, + _ => false, + }; + if state.lock().unwrap().mark(upstream_idx, is_up) { + if is_up { + println!("Upstream {:?} is now up.", address) + } else { + println!("Upstream {:?} is now down.", address) + } + } + sleep(Duration::from_millis(1000)).await; } } #[cfg(test)] mod test; - -// type SharedState = Arc<Mutex<State>>; -// -// #[tokio::main] -// async fn main() { -// let server = TcpListener::bind("0.0.0.0:8080").await.unwrap(); -// -// let state: SharedState = Arc::new(Mutex::new(State { -// a_up: false, -// b_up: false, -// prefer_a: true, -// a: "localhost:8001".to_string(), -// b: "localhost:8002".to_string(), -// })); -// -// -// { -// let state = state.clone(); -// tokio::spawn(async move { -// let mut signals = Signals::new([SIGUSR1]).unwrap(); -// for _ in signals.forever() { -// let mut state = state.lock().unwrap(); -// state.prefer_a = !state.prefer_a; -// println!("Update to server preference: prefer_a={:?}", state.prefer_a) -// } -// }); -// } -// -// { -// let state = state.clone(); -// tokio::spawn(async move { -// probe_upstream(true, state).await -// }); -// } -// { -// let state = state.clone(); -// tokio::spawn(async move { -// probe_upstream(false, state).await -// }); -// } -// -// loop { -// let (socket, _) = server.accept().await.unwrap(); -// let state = state.clone(); -// tokio::spawn(async move { -// handle_client(socket, state).await; -// }); -// } -// } -// -// async fn handle_client(client: TcpStream, state: SharedState) { -// let address = state.lock().unwrap().addr(); -// let server = TcpStream::connect(address.as_str()).await.unwrap(); -// -// let (mut eread, mut ewrite) = client.into_split(); -// let (mut oread, mut owrite) = server.into_split(); -// -// tokio::spawn(async move { io::copy(&mut eread, &mut owrite).await }); -// tokio::spawn(async move { io::copy(&mut oread, &mut ewrite).await }); -// } -// -// async fn probe_upstream(probe_a: bool, state: SharedState) { -// loop { -// let address = match probe_a { -// true => state.lock().unwrap().a.clone(), -// false => state.lock().unwrap().b.clone(), -// }; -// let upstream = timeout( -// Duration::from_millis(1000), -// TcpStream::connect(address.as_str()) -// ).await; -// match upstream { -// Ok(Ok(_)) => state.lock().unwrap().mark_up(probe_a), -// _ => state.lock().unwrap().mark_down(probe_a), -// } -// sleep(Duration::from_millis(1000)).await; -// } -// } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 21e8033fe137f98da00058da06c2d803e35712df..1f18e49820299b258a1fc3403315603984bed65a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,18 @@ +use clap::Parser; +use tcpjunction::launch; +#[derive(Parser)] +#[command(version, about, long_about = None)] +#[command(propagate_version = true)] +struct Cli { + bind_addr: String, + upstream_addr: Vec<String>, +} + fn main() { - println!("TCP Junction") + let args = Cli::parse(); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + launch(args.bind_addr, args.upstream_addr).await + }); } \ No newline at end of file