From 361829999135ab59b905e3c456f4c2f74198af13 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Mon, 27 Nov 2023 13:22:06 +0100 Subject: [PATCH] fix: use chrono durations and make jitter random 0..5 seconds instead of 0..5000 ms (#350) --- server/src/http/background_send_metrics.rs | 56 +++++++++++++--------- server/src/main.rs | 2 +- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/server/src/http/background_send_metrics.rs b/server/src/http/background_send_metrics.rs index 4fd1cf23..db449dbc 100644 --- a/server/src/http/background_send_metrics.rs +++ b/server/src/http/background_send_metrics.rs @@ -3,12 +3,12 @@ use std::cmp::max; use tracing::{error, info, trace, warn}; use super::unleash_client::UnleashClient; -use std::time::Duration; use crate::{ error::EdgeError, metrics::client_metrics::{size_of_batch, MetricsCache}, }; +use chrono::Duration; use lazy_static::lazy_static; use prometheus::{register_int_gauge, register_int_gauge_vec, IntGauge, IntGaugeVec, Opts}; use rand::Rng; @@ -30,10 +30,10 @@ lazy_static! { pub async fn send_metrics_task( metrics_cache: Arc, unleash_client: Arc, - send_interval: u64, + send_interval: i64, ) { let mut failures = 0; - let mut interval = Duration::from_secs(send_interval); + let mut interval = Duration::seconds(send_interval); loop { let batches = metrics_cache.get_appropriately_sized_batches(); trace!("Posting {} batches", batches.len()); @@ -55,20 +55,20 @@ pub async fn send_metrics_task( } StatusCode::NOT_FOUND => { failures = 10; - interval = new_interval(send_interval, failures, 5); - error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.as_secs()); + interval = new_interval(interval, failures, 5); + error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.num_seconds()); } StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => { failures = 10; - interval = new_interval(send_interval, failures, 5); - error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.as_secs()); + interval = new_interval(interval, failures, 5); + error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.num_seconds()); } StatusCode::TOO_MANY_REQUESTS => { failures = max(10, failures + 1); - interval = new_interval(send_interval, failures, 5); + interval = new_interval(interval, failures, 5); info!( "Upstream said it was too busy, backing off to {} seconds", - interval.as_secs() + interval.num_seconds() ); metrics_cache.reinsert_batch(batch); } @@ -77,8 +77,8 @@ pub async fn send_metrics_task( | StatusCode::SERVICE_UNAVAILABLE | StatusCode::GATEWAY_TIMEOUT => { failures = max(10, failures + 1); - interval = new_interval(send_interval, failures, 5); - info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.as_secs()); + interval = new_interval(interval, failures, 5); + info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.num_seconds()); metrics_cache.reinsert_batch(batch); } _ => { @@ -94,27 +94,37 @@ pub async fn send_metrics_task( } } else { failures = max(0, failures - 1); - interval = new_interval(send_interval, failures, 5); + interval = new_interval(interval, failures, 5); } } } trace!( "Done posting traces. Sleeping for {} seconds and then going again", - interval.as_secs() + interval.num_seconds() ); - tokio::time::sleep(interval).await; + tokio::time::sleep(std::time::Duration::from_secs(interval.num_seconds() as u64)).await; } } -fn new_interval(send_interval: u64, failures: u64, max_jitter_seconds: u64) -> Duration { - let initial = Duration::from_secs(send_interval); - let added_interval_from_failure = Duration::from_secs(send_interval * failures); - let jitter = random_jitter_milliseconds(max_jitter_seconds); - initial + added_interval_from_failure + jitter +fn new_interval(send_interval: Duration, failures: i32, max_jitter_seconds: u8) -> Duration { + let added_interval_from_failure = send_interval * failures; + let jitter = random_jitter_seconds(max_jitter_seconds); + send_interval + added_interval_from_failure + jitter } -fn random_jitter_milliseconds(max_jitter_seconds: u64) -> Duration { - let mut rng = rand::thread_rng(); - let jitter = rng.gen_range(0..(max_jitter_seconds * 1000)); - Duration::from_millis(jitter) +fn random_jitter_seconds(max_jitter_seconds: u8) -> Duration { + let jitter = rand::thread_rng().gen_range(0..max_jitter_seconds); + Duration::seconds(jitter as i64) +} + +#[cfg(test)] +mod tests { + use crate::http::background_send_metrics::new_interval; + use chrono::Duration; + + #[test] + pub fn new_interval_does_not_overflow() { + let metrics = new_interval(Duration::seconds(300), 10, 5); + assert!(metrics.num_seconds() < 3305); + } } diff --git a/server/src/main.rs b/server/src/main.rs index db09adad..9a8dd27d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -154,7 +154,7 @@ async fn main() -> Result<(), anyhow::Error> { _ = refresher.start_refresh_features_background_task() => { tracing::info!("Feature refresher unexpectedly shut down"); } - _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.unleash_client.clone(), edge.metrics_interval_seconds) => { + _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.unleash_client.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => { tracing::info!("Metrics poster unexpectedly shut down"); } _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone(), refresher.tokens_to_refresh.clone()) => {