diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..00b0cdf9260f7d88b75c0baa8acadaf2887f833a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,123 @@ + +// use tokio::io; +// use tokio::net::TcpListener; +// use tokio::net::TcpStream; +// use std::sync::{Arc, Mutex}; +// use signal_hook::{consts::SIGUSR1, iterator::Signals}; +// use std::time::Duration; +// use tokio::time::{sleep, timeout}; + +struct UpstreamState { + is_up: bool, + address: String, +} + +struct JunctionState { + upstreams: Vec<UpstreamState>, + preference: usize, +} + +impl JunctionState { + fn current(&self) -> String { + for offset in 0..self.upstreams.len() { + let idx = (self.preference + offset) % self.upstreams.len(); + if self.upstreams[idx].is_up { + return self.upstreams[idx].address.clone(); + } + } + self.upstreams[self.preference].address.clone() // all upstreams are down + } + + fn mark(&mut self, idx: usize, is_up: bool) -> bool { + if self.upstreams[idx].is_up != is_up { + self.upstreams[idx].is_up = is_up; + return true; + } + false + } + + fn advance(&mut self) { + self.preference = (self.preference + 1) % self.upstreams.len(); + } +} + +#[cfg(test)] +mod test; + +// type SharedState = Arc<Mutex<State>>; +// +// #[tokio::main] +// async fn main() { +// let server = TcpListener::bind("0.0.0.0:8080").await.unwrap(); +// +// let state: SharedState = Arc::new(Mutex::new(State { +// a_up: false, +// b_up: false, +// prefer_a: true, +// a: "localhost:8001".to_string(), +// b: "localhost:8002".to_string(), +// })); +// +// +// { +// let state = state.clone(); +// tokio::spawn(async move { +// let mut signals = Signals::new([SIGUSR1]).unwrap(); +// for _ in signals.forever() { +// let mut state = state.lock().unwrap(); +// state.prefer_a = !state.prefer_a; +// println!("Update to server preference: prefer_a={:?}", state.prefer_a) +// } +// }); +// } +// +// { +// let state = state.clone(); +// tokio::spawn(async move { +// probe_upstream(true, state).await +// }); +// } +// { +// let state = state.clone(); +// tokio::spawn(async move { +// probe_upstream(false, state).await +// }); +// } +// +// loop { +// let (socket, _) = server.accept().await.unwrap(); +// let state = state.clone(); +// tokio::spawn(async move { +// handle_client(socket, state).await; +// }); +// } +// } +// +// async fn handle_client(client: TcpStream, state: SharedState) { +// let address = state.lock().unwrap().addr(); +// let server = TcpStream::connect(address.as_str()).await.unwrap(); +// +// let (mut eread, mut ewrite) = client.into_split(); +// let (mut oread, mut owrite) = server.into_split(); +// +// tokio::spawn(async move { io::copy(&mut eread, &mut owrite).await }); +// tokio::spawn(async move { io::copy(&mut oread, &mut ewrite).await }); +// } +// +// async fn probe_upstream(probe_a: bool, state: SharedState) { +// loop { +// let address = match probe_a { +// true => state.lock().unwrap().a.clone(), +// false => state.lock().unwrap().b.clone(), +// }; +// let upstream = timeout( +// Duration::from_millis(1000), +// TcpStream::connect(address.as_str()) +// ).await; +// match upstream { +// Ok(Ok(_)) => state.lock().unwrap().mark_up(probe_a), +// _ => state.lock().unwrap().mark_down(probe_a), +// } +// sleep(Duration::from_millis(1000)).await; +// } +// } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 5b3d8a76b30b3ace89427828df698b657bd9f898..e71fdf55421d043f171eba8c32329338498cad17 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,133 +1 @@ -use tokio::io; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use std::sync::{Arc, Mutex}; -use signal_hook::{consts::SIGUSR1, iterator::Signals}; -use std::time::Duration; -use tokio::time::{sleep, timeout}; - -struct State { - a_up: bool, - b_up: bool, - a: String, - b: String, - prefer_a: bool -} - - -impl State { - fn use_a(&self) -> bool { - match (self.a_up, self.b_up) { - (true, true) => self.prefer_a, - (false, true) => false, - (true, false) => true, - (false, false) => self.prefer_a, - } - } - - fn addr(&self) -> String { - if self.use_a() { - self.a.clone() - } else { - self.b.clone() - } - } - - fn mark_up(&mut self, probe_a: bool) { - if probe_a && !self.a_up { - self.a_up = true; - println!("Marked a as up") - } else if !probe_a && !self.b_up { - self.b_up = true; - println!("Marked b as up") - } - } - - fn mark_down(&mut self, probe_a: bool) { - if probe_a && self.a_up { - self.a_up = false; - println!("Marked a as down") - } else if !probe_a && self.b_up { - self.b_up = false; - println!("Marked b as down") - } - } -} - -type SharedState = Arc<Mutex<State>>; - -#[tokio::main] -async fn main() { - let server = TcpListener::bind("0.0.0.0:8080").await.unwrap(); - - let state: SharedState = Arc::new(Mutex::new(State { - a_up: false, - b_up: false, - prefer_a: true, - a: "localhost:8001".to_string(), - b: "localhost:8002".to_string(), - })); - - - { - let state = state.clone(); - tokio::spawn(async move { - let mut signals = Signals::new([SIGUSR1]).unwrap(); - for _ in signals.forever() { - let mut state = state.lock().unwrap(); - state.prefer_a = !state.prefer_a; - println!("Update to server preference: prefer_a={:?}", state.prefer_a) - } - }); - } - - { - let state = state.clone(); - tokio::spawn(async move { - probe_upstream(true, state).await - }); - } - { - let state = state.clone(); - tokio::spawn(async move { - probe_upstream(false, state).await - }); - } - - loop { - let (socket, _) = server.accept().await.unwrap(); - let state = state.clone(); - tokio::spawn(async move { - handle_client(socket, state).await; - }); - } -} - -async fn handle_client(client: TcpStream, state: SharedState) { - let address = state.lock().unwrap().addr(); - let server = TcpStream::connect(address.as_str()).await.unwrap(); - - let (mut eread, mut ewrite) = client.into_split(); - let (mut oread, mut owrite) = server.into_split(); - - tokio::spawn(async move { io::copy(&mut eread, &mut owrite).await }); - tokio::spawn(async move { io::copy(&mut oread, &mut ewrite).await }); -} - -async fn probe_upstream(probe_a: bool, state: SharedState) { - loop { - let address = match probe_a { - true => state.lock().unwrap().a.clone(), - false => state.lock().unwrap().b.clone(), - }; - let upstream = timeout( - Duration::from_millis(1000), - TcpStream::connect(address.as_str()) - ).await; - match upstream { - Ok(Ok(_)) => state.lock().unwrap().mark_up(probe_a), - _ => state.lock().unwrap().mark_down(probe_a), - } - sleep(Duration::from_millis(1000)).await; - } -} \ No newline at end of file +fn main() {} \ No newline at end of file diff --git a/src/test.rs b/src/test.rs new file mode 100644 index 0000000000000000000000000000000000000000..4c8dec5ced13c27db32aee017512cd1f5f769934 --- /dev/null +++ b/src/test.rs @@ -0,0 +1,138 @@ + +use super::{JunctionState, UpstreamState}; + +#[test] +fn test_current_single() { + let state = JunctionState { + preference: 0, + upstreams: vec![ + UpstreamState { + address: "server".to_string(), + is_up: true + } + ] + }; + + assert_eq!(state.current(), "server"); +} + +#[test] +fn test_current_replica() { + let mut state = JunctionState { + preference: 0, + upstreams: vec![ + UpstreamState { + address: "server-0".to_string(), + is_up: true + }, + UpstreamState { + address: "server-1".to_string(), + is_up: true + } + ] + }; + + ///////////////////////////////////////////////////// + state.preference = 0; + + // UP -- up + state.upstreams[0].is_up = true; + state.upstreams[1].is_up = true; + assert_eq!(state.current(), "server-0"); + + // DOWN -- up + state.upstreams[0].is_up = false; + state.upstreams[1].is_up = true; + assert_eq!(state.current(), "server-1"); + + // UP -- down + state.upstreams[0].is_up = true; + state.upstreams[1].is_up = false; + assert_eq!(state.current(), "server-0"); + + // DOWN -- down + state.upstreams[0].is_up = false; + state.upstreams[1].is_up = false; + assert_eq!(state.current(), "server-0"); + + ///////////////////////////////////////////////////// + state.preference = 1; + + // up -- UP + state.upstreams[0].is_up = true; + state.upstreams[1].is_up = true; + assert_eq!(state.current(), "server-1"); + + // down -- UP + state.upstreams[0].is_up = false; + state.upstreams[1].is_up = true; + assert_eq!(state.current(), "server-1"); + + // up -- DOWN + state.upstreams[0].is_up = true; + state.upstreams[1].is_up = false; + assert_eq!(state.current(), "server-0"); + + // down -- DOWN + state.upstreams[0].is_up = false; + state.upstreams[1].is_up = false; + assert_eq!(state.current(), "server-1"); +} + +#[test] +fn test_advance() { + let mut state = JunctionState { + preference: 0, + upstreams: vec![ + UpstreamState { + address: "server-0".to_string(), + is_up: true + }, + UpstreamState { + address: "server-1".to_string(), + is_up: true + } + ] + }; + + assert_eq!(state.preference, 0); + + state.advance(); + assert_eq!(state.preference, 1); + + state.advance(); + assert_eq!(state.preference, 0); +} + +#[test] +fn test_mark() { + let mut state = JunctionState { + preference: 0, + upstreams: vec![ + UpstreamState { + address: "server-0".to_string(), + is_up: true + }, + UpstreamState { + address: "server-1".to_string(), + is_up: true + } + ] + }; + + let changed = state.mark(0, false); + assert_eq!(state.upstreams[0].is_up, false); + assert_eq!(state.upstreams[1].is_up, true); + assert_eq!(changed, true); + + let changed = state.mark(0, false); + assert_eq!(changed, false); + + state.mark(1, false); + assert_eq!(state.upstreams[0].is_up, false); + assert_eq!(state.upstreams[1].is_up, false); + + state.mark(0, true); + assert_eq!(state.upstreams[0].is_up, true); + assert_eq!(state.upstreams[1].is_up, false); +}