Skip to content
Snippets Groups Projects
Verified Commit 0f250c2c authored by Frank Sauerburger's avatar Frank Sauerburger
Browse files

Implement and test k-way JunctionState

parent 872da89e
No related branches found
No related tags found
No related merge requests found
// use tokio::io;
// use tokio::net::TcpListener;
// use tokio::net::TcpStream;
// use std::sync::{Arc, Mutex};
// use signal_hook::{consts::SIGUSR1, iterator::Signals};
// use std::time::Duration;
// use tokio::time::{sleep, timeout};
struct UpstreamState {
is_up: bool,
address: String,
}
struct JunctionState {
upstreams: Vec<UpstreamState>,
preference: usize,
}
impl JunctionState {
fn current(&self) -> String {
for offset in 0..self.upstreams.len() {
let idx = (self.preference + offset) % self.upstreams.len();
if self.upstreams[idx].is_up {
return self.upstreams[idx].address.clone();
}
}
self.upstreams[self.preference].address.clone() // all upstreams are down
}
fn mark(&mut self, idx: usize, is_up: bool) -> bool {
if self.upstreams[idx].is_up != is_up {
self.upstreams[idx].is_up = is_up;
return true;
}
false
}
fn advance(&mut self) {
self.preference = (self.preference + 1) % self.upstreams.len();
}
}
#[cfg(test)]
mod test;
// type SharedState = Arc<Mutex<State>>;
//
// #[tokio::main]
// async fn main() {
// let server = TcpListener::bind("0.0.0.0:8080").await.unwrap();
//
// let state: SharedState = Arc::new(Mutex::new(State {
// a_up: false,
// b_up: false,
// prefer_a: true,
// a: "localhost:8001".to_string(),
// b: "localhost:8002".to_string(),
// }));
//
//
// {
// let state = state.clone();
// tokio::spawn(async move {
// let mut signals = Signals::new([SIGUSR1]).unwrap();
// for _ in signals.forever() {
// let mut state = state.lock().unwrap();
// state.prefer_a = !state.prefer_a;
// println!("Update to server preference: prefer_a={:?}", state.prefer_a)
// }
// });
// }
//
// {
// let state = state.clone();
// tokio::spawn(async move {
// probe_upstream(true, state).await
// });
// }
// {
// let state = state.clone();
// tokio::spawn(async move {
// probe_upstream(false, state).await
// });
// }
//
// loop {
// let (socket, _) = server.accept().await.unwrap();
// let state = state.clone();
// tokio::spawn(async move {
// handle_client(socket, state).await;
// });
// }
// }
//
// async fn handle_client(client: TcpStream, state: SharedState) {
// let address = state.lock().unwrap().addr();
// let server = TcpStream::connect(address.as_str()).await.unwrap();
//
// let (mut eread, mut ewrite) = client.into_split();
// let (mut oread, mut owrite) = server.into_split();
//
// tokio::spawn(async move { io::copy(&mut eread, &mut owrite).await });
// tokio::spawn(async move { io::copy(&mut oread, &mut ewrite).await });
// }
//
// async fn probe_upstream(probe_a: bool, state: SharedState) {
// loop {
// let address = match probe_a {
// true => state.lock().unwrap().a.clone(),
// false => state.lock().unwrap().b.clone(),
// };
// let upstream = timeout(
// Duration::from_millis(1000),
// TcpStream::connect(address.as_str())
// ).await;
// match upstream {
// Ok(Ok(_)) => state.lock().unwrap().mark_up(probe_a),
// _ => state.lock().unwrap().mark_down(probe_a),
// }
// sleep(Duration::from_millis(1000)).await;
// }
// }
\ No newline at end of file
use tokio::io; fn main() {}
use tokio::net::TcpListener; \ No newline at end of file
use tokio::net::TcpStream;
use std::sync::{Arc, Mutex};
use signal_hook::{consts::SIGUSR1, iterator::Signals};
use std::time::Duration;
use tokio::time::{sleep, timeout};
struct State {
a_up: bool,
b_up: bool,
a: String,
b: String,
prefer_a: bool
}
impl State {
fn use_a(&self) -> bool {
match (self.a_up, self.b_up) {
(true, true) => self.prefer_a,
(false, true) => false,
(true, false) => true,
(false, false) => self.prefer_a,
}
}
fn addr(&self) -> String {
if self.use_a() {
self.a.clone()
} else {
self.b.clone()
}
}
fn mark_up(&mut self, probe_a: bool) {
if probe_a && !self.a_up {
self.a_up = true;
println!("Marked a as up")
} else if !probe_a && !self.b_up {
self.b_up = true;
println!("Marked b as up")
}
}
fn mark_down(&mut self, probe_a: bool) {
if probe_a && self.a_up {
self.a_up = false;
println!("Marked a as down")
} else if !probe_a && self.b_up {
self.b_up = false;
println!("Marked b as down")
}
}
}
type SharedState = Arc<Mutex<State>>;
#[tokio::main]
async fn main() {
let server = TcpListener::bind("0.0.0.0:8080").await.unwrap();
let state: SharedState = Arc::new(Mutex::new(State {
a_up: false,
b_up: false,
prefer_a: true,
a: "localhost:8001".to_string(),
b: "localhost:8002".to_string(),
}));
{
let state = state.clone();
tokio::spawn(async move {
let mut signals = Signals::new([SIGUSR1]).unwrap();
for _ in signals.forever() {
let mut state = state.lock().unwrap();
state.prefer_a = !state.prefer_a;
println!("Update to server preference: prefer_a={:?}", state.prefer_a)
}
});
}
{
let state = state.clone();
tokio::spawn(async move {
probe_upstream(true, state).await
});
}
{
let state = state.clone();
tokio::spawn(async move {
probe_upstream(false, state).await
});
}
loop {
let (socket, _) = server.accept().await.unwrap();
let state = state.clone();
tokio::spawn(async move {
handle_client(socket, state).await;
});
}
}
async fn handle_client(client: TcpStream, state: SharedState) {
let address = state.lock().unwrap().addr();
let server = TcpStream::connect(address.as_str()).await.unwrap();
let (mut eread, mut ewrite) = client.into_split();
let (mut oread, mut owrite) = server.into_split();
tokio::spawn(async move { io::copy(&mut eread, &mut owrite).await });
tokio::spawn(async move { io::copy(&mut oread, &mut ewrite).await });
}
async fn probe_upstream(probe_a: bool, state: SharedState) {
loop {
let address = match probe_a {
true => state.lock().unwrap().a.clone(),
false => state.lock().unwrap().b.clone(),
};
let upstream = timeout(
Duration::from_millis(1000),
TcpStream::connect(address.as_str())
).await;
match upstream {
Ok(Ok(_)) => state.lock().unwrap().mark_up(probe_a),
_ => state.lock().unwrap().mark_down(probe_a),
}
sleep(Duration::from_millis(1000)).await;
}
}
\ No newline at end of file
use super::{JunctionState, UpstreamState};
#[test]
fn test_current_single() {
let state = JunctionState {
preference: 0,
upstreams: vec![
UpstreamState {
address: "server".to_string(),
is_up: true
}
]
};
assert_eq!(state.current(), "server");
}
#[test]
fn test_current_replica() {
let mut state = JunctionState {
preference: 0,
upstreams: vec![
UpstreamState {
address: "server-0".to_string(),
is_up: true
},
UpstreamState {
address: "server-1".to_string(),
is_up: true
}
]
};
/////////////////////////////////////////////////////
state.preference = 0;
// UP -- up
state.upstreams[0].is_up = true;
state.upstreams[1].is_up = true;
assert_eq!(state.current(), "server-0");
// DOWN -- up
state.upstreams[0].is_up = false;
state.upstreams[1].is_up = true;
assert_eq!(state.current(), "server-1");
// UP -- down
state.upstreams[0].is_up = true;
state.upstreams[1].is_up = false;
assert_eq!(state.current(), "server-0");
// DOWN -- down
state.upstreams[0].is_up = false;
state.upstreams[1].is_up = false;
assert_eq!(state.current(), "server-0");
/////////////////////////////////////////////////////
state.preference = 1;
// up -- UP
state.upstreams[0].is_up = true;
state.upstreams[1].is_up = true;
assert_eq!(state.current(), "server-1");
// down -- UP
state.upstreams[0].is_up = false;
state.upstreams[1].is_up = true;
assert_eq!(state.current(), "server-1");
// up -- DOWN
state.upstreams[0].is_up = true;
state.upstreams[1].is_up = false;
assert_eq!(state.current(), "server-0");
// down -- DOWN
state.upstreams[0].is_up = false;
state.upstreams[1].is_up = false;
assert_eq!(state.current(), "server-1");
}
#[test]
fn test_advance() {
let mut state = JunctionState {
preference: 0,
upstreams: vec![
UpstreamState {
address: "server-0".to_string(),
is_up: true
},
UpstreamState {
address: "server-1".to_string(),
is_up: true
}
]
};
assert_eq!(state.preference, 0);
state.advance();
assert_eq!(state.preference, 1);
state.advance();
assert_eq!(state.preference, 0);
}
#[test]
fn test_mark() {
let mut state = JunctionState {
preference: 0,
upstreams: vec![
UpstreamState {
address: "server-0".to_string(),
is_up: true
},
UpstreamState {
address: "server-1".to_string(),
is_up: true
}
]
};
let changed = state.mark(0, false);
assert_eq!(state.upstreams[0].is_up, false);
assert_eq!(state.upstreams[1].is_up, true);
assert_eq!(changed, true);
let changed = state.mark(0, false);
assert_eq!(changed, false);
state.mark(1, false);
assert_eq!(state.upstreams[0].is_up, false);
assert_eq!(state.upstreams[1].is_up, false);
state.mark(0, true);
assert_eq!(state.upstreams[0].is_up, true);
assert_eq!(state.upstreams[1].is_up, false);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment