Skip to content

Commit

Permalink
Move handle out of monitor
Browse files Browse the repository at this point in the history
Partially revert "5d60ca3c75049bae668c56f01db66aea5454572a"
  • Loading branch information
sorz committed Aug 15, 2018
1 parent bad153c commit 310b759
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ fn main() {
panic!("missing server list");
}
info!("total {} server(s) added", servers.len());
let monitor = Monitor::new(servers, graphite);

let mut lp = Core::new().expect("fail to create event loop");
let handle = lp.handle();
let monitor = Monitor::new(servers, graphite, handle.clone());

let mut sock_file = None;
if let Some(http_addr) = args.value_of("web-bind") {
Expand Down Expand Up @@ -115,7 +115,7 @@ fn main() {
.expect("fail to set tcp congestion algorithm. \
check tcp_allowed_congestion_control?");
}
handle.spawn(monitor.monitor_delay(probe));
handle.spawn(monitor.monitor_delay(probe, handle.clone()));
let shared_buf = SharedBuf::new(8192);
let server = listener.incoming().for_each(move |(sock, addr)| {
debug!("incoming {}", addr);
Expand Down
46 changes: 18 additions & 28 deletions src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::{Instant, Duration, SystemTime};
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use rand::{self, Rng};
use tokio_core::reactor::{Handle, Timeout, Remote};
use tokio_core::reactor::{Handle, Timeout};
use tokio_core::net::TcpStream;
use tokio_io::io::{read_exact, write_all};
use futures::{future, Future, IntoFuture};
Expand All @@ -29,13 +29,11 @@ pub struct Monitor {
servers: Arc<Mutex<ServerList>>,
meters: Arc<Mutex<HashMap<Arc<ProxyServer>, Meter>>>,
graphite: Option<SocketAddr>,
remote: Remote,
}

impl Monitor {
pub fn new(servers: Vec<ProxyServer>,
graphite: Option<SocketAddr>,
handle: Handle) -> Monitor {
graphite: Option<SocketAddr>) -> Monitor {
let servers: Vec<_> = servers.into_iter()
.map(|server| Arc::new(server))
.collect();
Expand All @@ -45,7 +43,6 @@ impl Monitor {
Monitor {
servers: Arc::new(Mutex::new(servers)),
meters: Arc::new(Mutex::new(meters)),
remote: handle.remote().clone(),
graphite,
}
}
Expand All @@ -64,36 +61,30 @@ impl Monitor {
debug!("scores:{}", info_stats(&*self.servers.lock().unwrap()));
}

fn handle(&self) -> Handle {
self.remote.handle()
.expect("Should run in the same thread.")
}

/// Start monitoring delays.
/// Returned Future won't return unless error on timer.
pub fn monitor_delay(&self, probe: u64)
pub fn monitor_delay(&self, probe: u64, handle: Handle)
-> impl Future<Item=(), Error=()> {
let init = test_all(self.clone(), true);
let init = test_all(self.clone(), true, handle);
let interval = Duration::from_secs(probe);
let handle = self.handle();
init.and_then(move |monitor| {
future::loop_fn(monitor, move |monitor| {
let wait = Timeout::new(interval, &handle)
init.and_then(move |(mon, hd)| {
future::loop_fn((mon, hd), move |(mon, hd)| {
let wait = Timeout::new(interval, &hd)
.expect("error on get timeout from reactor")
.map_err(|err| panic!("error on timer: {}", err));
wait.and_then(move |_| {
test_all(monitor, false).and_then(send_metrics)
test_all(mon, false, hd)
.and_then(|(mon, hd)| send_metrics(mon, hd))
}).and_then(|args| Ok(Loop::Continue(args)))
})
}).map_err(|_| ())
}

/// Start monitoring throughput.
/// Returned Future won't return unless error on timer.
pub fn monitor_throughput(&self)
pub fn monitor_throughput(&self, handle: Handle)
-> impl Future<Item=(), Error=()> {
let interval = Duration::from_secs(THROUGHPUT_INTERVAL_SECS);
let handle = self.handle();
future::loop_fn(self.clone(), move |monitor| {
for (server, meter) in monitor.meters.lock().unwrap().iter_mut() {
meter.add_sample(server.traffic());
Expand Down Expand Up @@ -126,12 +117,12 @@ fn info_stats(infos: &ServerList) -> String {
stats
}

fn test_all(monitor: Monitor, init: bool)
-> impl Future<Item=Monitor, Error=()> {
fn test_all(monitor: Monitor, init: bool, handle: Handle)
-> impl Future<Item=(Monitor, Handle), Error=()> {
debug!("testing all servers...");
let handle = monitor.handle();
let handle_ = handle.clone();
let tests: Vec<_> = monitor.servers().into_iter().map(move |server| {
let test = alive_test(&server, &handle).then(move |result| {
let test = alive_test(&server, &handle_).then(move |result| {
if init {
server.set_delay(result.ok());
} else {
Expand All @@ -144,15 +135,14 @@ fn test_all(monitor: Monitor, init: bool)

future::join_all(tests).map(move |_| {
monitor.resort();
monitor
(monitor, handle)
})
}

// send graphite metrics if need
fn send_metrics(monitor: Monitor)
-> impl Future<Item=Monitor, Error=()> {
fn send_metrics(monitor: Monitor, handle: Handle)
-> impl Future<Item=(Monitor, Handle), Error=()> {
if let Some(ref addr) = monitor.graphite {
let handle = monitor.handle();
let servers = monitor.servers();
let now = Some(SystemTime::now());
let records = servers.iter().flat_map(|server| {
Expand Down Expand Up @@ -189,7 +179,7 @@ fn send_metrics(monitor: Monitor)
Box::new(send) as Box<Future<Item=(), Error=()>>
} else {
Box::new(Ok(()).into_future()) as Box<Future<Item=(), Error=()>>
}.map(|()| monitor)
}.map(|()| (monitor, handle))
}

fn alive_test(server: &ProxyServer, handle: &Handle)
Expand Down
2 changes: 1 addition & 1 deletion src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub fn run_server<I, S, A>(incoming: I, monitor: Monitor, handle: &Handle)
where I: Stream<Item=(S, A), Error=io::Error> + 'static,
S: AsyncRead + AsyncWrite + Send + 'static,
A: Debug {
handle.spawn(monitor.monitor_throughput());
handle.spawn(monitor.monitor_throughput(handle.clone()));
let start_time = Instant::now();

let new_service = move || {
Expand Down

0 comments on commit 310b759

Please sign in to comment.