From 0f250c2c6d7d2083a822b06f7b42eea26ef34db8 Mon Sep 17 00:00:00 2001
From: Frank Sauerburger <frank@sauerburger.com>
Date: Wed, 27 Nov 2024 22:22:09 +0100
Subject: [PATCH] Implement and test k-way JunctionState

---
 src/lib.rs  | 123 ++++++++++++++++++++++++++++++++++++++++++++++
 src/main.rs | 134 +-------------------------------------------------
 src/test.rs | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 262 insertions(+), 133 deletions(-)
 create mode 100644 src/lib.rs
 create mode 100644 src/test.rs

diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..00b0cdf
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,123 @@
+
+// use tokio::io;
+// use tokio::net::TcpListener;
+// use tokio::net::TcpStream;
+// use std::sync::{Arc, Mutex};
+// use signal_hook::{consts::SIGUSR1, iterator::Signals};
+// use std::time::Duration;
+// use tokio::time::{sleep, timeout};
+
+struct UpstreamState {
+    is_up: bool,
+    address: String,
+}
+
+struct JunctionState {
+    upstreams: Vec<UpstreamState>,
+    preference: usize,
+}
+
+impl JunctionState {
+    fn current(&self) -> String {
+        for offset in 0..self.upstreams.len() {
+            let idx = (self.preference + offset) % self.upstreams.len();
+            if self.upstreams[idx].is_up {
+                return self.upstreams[idx].address.clone();
+            }
+        }
+        self.upstreams[self.preference].address.clone()  // all upstreams are down
+    }
+
+    fn mark(&mut self, idx: usize, is_up: bool) -> bool {
+        if self.upstreams[idx].is_up != is_up {
+            self.upstreams[idx].is_up = is_up;
+            return true;
+        }
+        false
+    }
+
+    fn advance(&mut self) {
+        self.preference = (self.preference + 1) % self.upstreams.len();
+    }
+}
+
+#[cfg(test)]
+mod test;
+
+// type SharedState = Arc<Mutex<State>>;
+//
+// #[tokio::main]
+// async fn main() {
+//     let server = TcpListener::bind("0.0.0.0:8080").await.unwrap();
+//
+//     let state: SharedState = Arc::new(Mutex::new(State {
+//         a_up: false,
+//         b_up: false,
+//         prefer_a: true,
+//         a: "localhost:8001".to_string(),
+//         b: "localhost:8002".to_string(),
+//     }));
+//
+//
+//     {
+//         let state = state.clone();
+//         tokio::spawn(async move {
+//             let mut signals = Signals::new([SIGUSR1]).unwrap();
+//             for _ in signals.forever() {
+//                 let mut state = state.lock().unwrap();
+//                 state.prefer_a = !state.prefer_a;
+//                 println!("Update to server preference: prefer_a={:?}", state.prefer_a)
+//             }
+//         });
+//     }
+//
+//     {
+//         let state = state.clone();
+//         tokio::spawn(async move {
+//             probe_upstream(true, state).await
+//         });
+//     }
+//     {
+//         let state = state.clone();
+//         tokio::spawn(async move {
+//             probe_upstream(false, state).await
+//         });
+//     }
+//
+//     loop {
+//         let (socket, _) = server.accept().await.unwrap();
+//         let state = state.clone();
+//         tokio::spawn(async move {
+//             handle_client(socket, state).await;
+//         });
+//     }
+// }
+//
+// async fn handle_client(client: TcpStream, state: SharedState) {
+//     let address = state.lock().unwrap().addr();
+//     let server = TcpStream::connect(address.as_str()).await.unwrap();
+//
+//     let (mut eread, mut ewrite) = client.into_split();
+//     let (mut oread, mut owrite) = server.into_split();
+//
+//     tokio::spawn(async move { io::copy(&mut eread, &mut owrite).await });
+//     tokio::spawn(async move { io::copy(&mut oread, &mut ewrite).await });
+// }
+//
+// async fn probe_upstream(probe_a: bool, state: SharedState) {
+//     loop {
+//         let address = match probe_a {
+//             true => state.lock().unwrap().a.clone(),
+//             false => state.lock().unwrap().b.clone(),
+//         };
+//         let upstream  = timeout(
+//             Duration::from_millis(1000),
+//             TcpStream::connect(address.as_str())
+//         ).await;
+//         match upstream  {
+//             Ok(Ok(_)) => state.lock().unwrap().mark_up(probe_a),
+//             _ => state.lock().unwrap().mark_down(probe_a),
+//         }
+//         sleep(Duration::from_millis(1000)).await;
+//     }
+// }
\ No newline at end of file
diff --git a/src/main.rs b/src/main.rs
index 5b3d8a7..e71fdf5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,133 +1 @@
-use tokio::io;
-use tokio::net::TcpListener;
-use tokio::net::TcpStream;
-use std::sync::{Arc, Mutex};
-use signal_hook::{consts::SIGUSR1, iterator::Signals};
-use std::time::Duration;
-use tokio::time::{sleep, timeout};
-
-struct State {
-    a_up: bool,
-    b_up: bool,
-    a: String,
-    b: String,
-    prefer_a: bool
-}
-
-
-impl State {
-    fn use_a(&self) -> bool {
-        match (self.a_up, self.b_up) {
-            (true, true) => self.prefer_a,
-            (false, true) => false,
-            (true, false) => true,
-            (false, false) => self.prefer_a,
-        }
-    }
-
-    fn addr(&self) -> String {
-        if self.use_a() {
-            self.a.clone()
-        } else {
-            self.b.clone()
-        }
-    }
-
-    fn mark_up(&mut self, probe_a: bool) {
-        if probe_a && !self.a_up {
-            self.a_up = true;
-            println!("Marked a as up")
-        } else if !probe_a && !self.b_up {
-            self.b_up = true;
-            println!("Marked b as up")
-        }
-    }
-
-    fn mark_down(&mut self, probe_a: bool) {
-        if probe_a && self.a_up {
-            self.a_up = false;
-            println!("Marked a as down")
-        } else if !probe_a && self.b_up {
-            self.b_up = false;
-            println!("Marked b as down")
-        }
-    }
-}
-
-type SharedState = Arc<Mutex<State>>;
-
-#[tokio::main]
-async fn main() {
-    let server = TcpListener::bind("0.0.0.0:8080").await.unwrap();
-
-    let state: SharedState = Arc::new(Mutex::new(State {
-        a_up: false,
-        b_up: false,
-        prefer_a: true,
-        a: "localhost:8001".to_string(),
-        b: "localhost:8002".to_string(),
-    }));
-
-
-    {
-        let state = state.clone();
-        tokio::spawn(async move {
-            let mut signals = Signals::new([SIGUSR1]).unwrap();
-            for _ in signals.forever() {
-                let mut state = state.lock().unwrap();
-                state.prefer_a = !state.prefer_a;
-                println!("Update to server preference: prefer_a={:?}", state.prefer_a)
-            }
-        });
-    }
-
-    {
-        let state = state.clone();
-        tokio::spawn(async move {
-            probe_upstream(true, state).await
-        });
-    }
-    {
-        let state = state.clone();
-        tokio::spawn(async move {
-            probe_upstream(false, state).await
-        });
-    }
-
-    loop {
-        let (socket, _) = server.accept().await.unwrap();
-        let state = state.clone();
-        tokio::spawn(async move {
-            handle_client(socket, state).await;
-        });
-    }
-}
-
-async fn handle_client(client: TcpStream, state: SharedState) {
-    let address = state.lock().unwrap().addr();
-    let server = TcpStream::connect(address.as_str()).await.unwrap();
-
-    let (mut eread, mut ewrite) = client.into_split();
-    let (mut oread, mut owrite) = server.into_split();
-
-    tokio::spawn(async move { io::copy(&mut eread, &mut owrite).await });
-    tokio::spawn(async move { io::copy(&mut oread, &mut ewrite).await });
-}
-
-async fn probe_upstream(probe_a: bool, state: SharedState) {
-    loop {
-        let address = match probe_a {
-            true => state.lock().unwrap().a.clone(),
-            false => state.lock().unwrap().b.clone(),
-        };
-        let upstream  = timeout(
-            Duration::from_millis(1000),
-            TcpStream::connect(address.as_str())
-        ).await;
-        match upstream  {
-            Ok(Ok(_)) => state.lock().unwrap().mark_up(probe_a),
-            _ => state.lock().unwrap().mark_down(probe_a),
-        }
-        sleep(Duration::from_millis(1000)).await;
-    }
-}
\ No newline at end of file
+fn main() {}
\ No newline at end of file
diff --git a/src/test.rs b/src/test.rs
new file mode 100644
index 0000000..4c8dec5
--- /dev/null
+++ b/src/test.rs
@@ -0,0 +1,138 @@
+
+use super::{JunctionState, UpstreamState};
+
+#[test]
+fn test_current_single() {
+   let state = JunctionState {
+       preference: 0,
+       upstreams: vec![
+           UpstreamState {
+               address: "server".to_string(),
+               is_up: true
+           }
+       ]
+   };
+
+    assert_eq!(state.current(), "server");
+}
+
+#[test]
+fn test_current_replica() {
+    let mut state = JunctionState {
+        preference: 0,
+        upstreams: vec![
+            UpstreamState {
+                address: "server-0".to_string(),
+                is_up: true
+            },
+            UpstreamState {
+                address: "server-1".to_string(),
+                is_up: true
+            }
+        ]
+    };
+
+    /////////////////////////////////////////////////////
+    state.preference = 0;
+
+    // UP -- up
+    state.upstreams[0].is_up = true;
+    state.upstreams[1].is_up = true;
+    assert_eq!(state.current(), "server-0");
+
+    // DOWN -- up
+    state.upstreams[0].is_up = false;
+    state.upstreams[1].is_up = true;
+    assert_eq!(state.current(), "server-1");
+
+    // UP -- down
+    state.upstreams[0].is_up = true;
+    state.upstreams[1].is_up = false;
+    assert_eq!(state.current(), "server-0");
+
+    // DOWN -- down
+    state.upstreams[0].is_up = false;
+    state.upstreams[1].is_up = false;
+    assert_eq!(state.current(), "server-0");
+
+    /////////////////////////////////////////////////////
+    state.preference = 1;
+
+    // up -- UP
+    state.upstreams[0].is_up = true;
+    state.upstreams[1].is_up = true;
+    assert_eq!(state.current(), "server-1");
+
+    // down -- UP
+    state.upstreams[0].is_up = false;
+    state.upstreams[1].is_up = true;
+    assert_eq!(state.current(), "server-1");
+
+    // up -- DOWN
+    state.upstreams[0].is_up = true;
+    state.upstreams[1].is_up = false;
+    assert_eq!(state.current(), "server-0");
+
+    // down -- DOWN
+    state.upstreams[0].is_up = false;
+    state.upstreams[1].is_up = false;
+    assert_eq!(state.current(), "server-1");
+}
+
+#[test]
+fn test_advance() {
+    let mut state = JunctionState {
+        preference: 0,
+        upstreams: vec![
+            UpstreamState {
+                address: "server-0".to_string(),
+                is_up: true
+            },
+            UpstreamState {
+                address: "server-1".to_string(),
+                is_up: true
+            }
+        ]
+    };
+
+    assert_eq!(state.preference, 0);
+
+    state.advance();
+    assert_eq!(state.preference, 1);
+
+    state.advance();
+    assert_eq!(state.preference, 0);
+}
+
+#[test]
+fn test_mark() {
+    let mut state = JunctionState {
+        preference: 0,
+        upstreams: vec![
+            UpstreamState {
+                address: "server-0".to_string(),
+                is_up: true
+            },
+            UpstreamState {
+                address: "server-1".to_string(),
+                is_up: true
+            }
+        ]
+    };
+
+    let changed = state.mark(0, false);
+    assert_eq!(state.upstreams[0].is_up, false);
+    assert_eq!(state.upstreams[1].is_up, true);
+    assert_eq!(changed, true);
+
+    let changed = state.mark(0, false);
+    assert_eq!(changed, false);
+
+    state.mark(1, false);
+    assert_eq!(state.upstreams[0].is_up, false);
+    assert_eq!(state.upstreams[1].is_up, false);
+
+    state.mark(0, true);
+    assert_eq!(state.upstreams[0].is_up, true);
+    assert_eq!(state.upstreams[1].is_up, false);
+}
-- 
GitLab