From a9e6c994907f35dcc7bec842b7e0eb6c65bfe0a9 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Fri, 10 Jan 2025 10:57:40 +0200 Subject: [PATCH] Add flag --- Cargo.lock | 19 ++- server/Cargo.toml | 8 + server/src/feature_cache.rs | 61 ++++++++ .../src/http/refresher/feature_refresher.rs | 145 ++++++++++++++++++ server/src/http/unleash_client.rs | 8 + server/src/urls.rs | 5 + 6 files changed, 244 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ae55f7b..ebdc92ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4589,7 +4589,7 @@ dependencies = [ "tracing-subscriber", "tracing-test", "ulid", - "unleash-types", + "unleash-types 0.15.2", "unleash-yggdrasil", "utoipa", "utoipa-swagger-ui", @@ -4600,6 +4600,18 @@ name = "unleash-types" version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ead505b176bd504c31815a4804d305789c3cbd6f89e932d118cd0830a955f0" +dependencies = [ + "chrono", + "derive_builder", + "serde", + "serde_json", +] + +[[package]] +name = "unleash-types" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14bf22b052d349e647b8e97043562c0b87f93d63b4944b72dd7b56acb8ac3edf" dependencies = [ "base64 0.22.1", "chrono", @@ -4629,7 +4641,7 @@ dependencies = [ "semver", "serde", "serde_json", - "unleash-types", + "unleash-types 0.14.0", ] [[package]] @@ -5118,6 +5130,7 @@ name = "xxhash-rust" version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" +<<<<<<< Updated upstream [[package]] name = "yoke" @@ -5142,6 +5155,8 @@ dependencies = [ "syn 2.0.87", "synstructure", ] +======= +>>>>>>> Stashed changes [[package]] name = "zerocopy" diff --git a/server/Cargo.toml b/server/Cargo.toml index a9075b09..34ec6558 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -91,11 +91,19 @@ tokio = { version = "1.42.0", features = [ "fs", ] } tokio-stream = { version = "0.1.17" } +<<<<<<< Updated upstream tracing = { version = "0.1.41", features = ["log"] } tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] } ulid = "1.1.3" unleash-types = { version = "0.15.3", features = ["openapi", "hashes"] } unleash-yggdrasil = { version = "0.14.5" } +======= +tracing = { version = "0.1.40", features = ["log"] } +tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] } +ulid = "1.1.2" +unleash-types = { version = "0.15.2", features = ["openapi", "hashes"] } +unleash-yggdrasil = { version = "0.14.1" } +>>>>>>> Stashed changes utoipa = { version = "5", features = ["actix_extras", "chrono"] } utoipa-swagger-ui = { version = "8", features = ["actix-web"] } [dev-dependencies] diff --git a/server/src/feature_cache.rs b/server/src/feature_cache.rs index 3e1021e3..a7cd3d82 100644 --- a/server/src/feature_cache.rs +++ b/server/src/feature_cache.rs @@ -67,9 +67,15 @@ impl FeatureCache { self.send_full_update(key); } +<<<<<<< Updated upstream pub fn apply_delta(&self, key: String, delta: &ClientFeaturesDelta) { let client_features = ClientFeatures { version : 2, +======= + pub fn apply_delta(&self, key: String, token: &EdgeToken, delta: ClientFeaturesDelta) { + let client_features = ClientFeatures { + version : 1, +>>>>>>> Stashed changes features : delta.updated.clone(), segments: delta.segments.clone(), query: None, @@ -78,10 +84,35 @@ impl FeatureCache { self.features .entry(key.clone()) .and_modify(|existing_features| { +<<<<<<< Updated upstream existing_features.modify_in_place(delta); }) .or_insert(client_features); self.send_full_update(key); +======= + let updated = update_client_features_delta(token, existing_features, &delta); // TODO: is this replacing or merging the flags + *existing_features = updated; + }) + .or_insert(client_features); + self.send_full_update(key); + + // let mut current_state = self.compiled_state.take().unwrap_or_default(); + // let segment_map = build_segment_map(&delta.segments); + // let mut warnings: Vec = vec![]; + // for removed in delta.removed.clone() { + // current_state.remove(&removed); + // } + // for update in delta.updated.clone() { + // let updated_state = compile(&update, &segment_map, &mut warnings); + // current_state.insert(update.name.clone(), updated_state); + // } + // self.compiled_state = Some(current_state); + // if warnings.is_empty() { + // None + // } else { + // Some(warnings) + // } +>>>>>>> Stashed changes } pub fn is_empty(&self) -> bool { @@ -116,7 +147,37 @@ fn update_client_features( s }), query: old.query.clone().or(update.query.clone()), +<<<<<<< Updated upstream meta: old.meta.clone().or(update.meta.clone()), +======= + meta: None, + } +} + +fn update_client_features_delta( + token: &EdgeToken, + old: &ClientFeatures, + delta: &ClientFeaturesDelta, +) -> ClientFeatures { + let mut updated_features = + update_projects_from_feature_update(token, &old.features, &delta.updated); + + for removed_feature in &delta.removed { + updated_features.retain(|feature| feature.name != *removed_feature); + } + updated_features.sort(); + + let segments = merge_segments_update(old.segments.clone(), delta.segments.clone()); + ClientFeatures { + version: 1, + features: updated_features, + segments: segments.map(|mut s| { + s.sort(); + s + }), + query: None, + meta: None, +>>>>>>> Stashed changes } } diff --git a/server/src/http/refresher/feature_refresher.rs b/server/src/http/refresher/feature_refresher.rs index 3f647937..047562bd 100644 --- a/server/src/http/refresher/feature_refresher.rs +++ b/server/src/http/refresher/feature_refresher.rs @@ -8,7 +8,11 @@ use eventsource_client::Client; use futures::TryStreamExt; use reqwest::StatusCode; use tracing::{debug, info, warn}; +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs use unleash_types::client_features::{ClientFeatures}; +======= +use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta}; +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs use unleash_types::client_metrics::{ClientApplication, MetricsMetadata}; use unleash_yggdrasil::EngineState; @@ -18,9 +22,13 @@ use crate::filters::{filter_client_features, FeatureFilterSet}; use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs use crate::types::{ build, EdgeResult, TokenType, TokenValidationStatus, }; +======= +use crate::types::{build, ClientFeaturesDeltaResponse, EdgeResult, TokenType, TokenValidationStatus}; +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs use crate::{ persistence::EdgePersistence, tokens::{cache_key, simplify}, @@ -50,8 +58,13 @@ pub struct FeatureRefresher { pub persistence: Option>, pub strict: bool, pub streaming: bool, +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs pub client_meta_information: ClientMetaInformation, pub delta: bool, +======= + pub delta: bool, + pub app_name: String, +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs } impl Default for FeatureRefresher { @@ -65,8 +78,13 @@ impl Default for FeatureRefresher { persistence: None, strict: true, streaming: false, +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs client_meta_information: Default::default(), delta: false, +======= + delta: false, + app_name: "unleash_edge".into(), +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs } } } @@ -103,8 +121,13 @@ pub enum FeatureRefresherMode { pub struct FeatureRefreshConfig { features_refresh_interval: chrono::Duration, mode: FeatureRefresherMode, +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs client_meta_information: ClientMetaInformation, delta: bool, +======= + delta: bool, + app_name: String, +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs } impl FeatureRefreshConfig { @@ -140,8 +163,13 @@ impl FeatureRefresher { persistence, strict: config.mode != FeatureRefresherMode::Dynamic, streaming: config.mode == FeatureRefresherMode::Streaming, +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs client_meta_information: config.client_meta_information, delta: config.delta, +======= + delta : config.delta, + app_name: config.app_name, +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs } } @@ -412,7 +440,11 @@ impl FeatureRefresher { pub async fn hydrate_new_tokens(&self) { let hydrations = self.get_tokens_never_refreshed(); for hydration in hydrations { +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs if cfg!(feature = "delta") && self.delta { +======= + if self.delta { +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs self.refresh_single_delta(hydration).await; } else { self.refresh_single(hydration).await; @@ -422,12 +454,19 @@ impl FeatureRefresher { pub async fn refresh_features(&self) { let refreshes = self.get_tokens_due_for_refresh(); for refresh in refreshes { +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs if cfg!(feature = "delta") && self.delta { +======= + if self.delta { +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs self.refresh_single_delta(refresh).await; } else { self.refresh_single(refresh).await; } +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs +======= +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs } } @@ -465,6 +504,45 @@ impl FeatureRefresher { new_state }); } +<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs +======= + + async fn handle_client_features_delta_updated( + &self, + refresh_token: &EdgeToken, + features: ClientFeaturesDelta, + etag: Option, + ) { + debug!("Got updated client features delta. Updating features with {etag:?}"); + let key = cache_key(refresh_token); + self.update_last_refresh(refresh_token, etag, features.updated.len()); /// TODO: why we need to set updated here + self.features_cache + .modify(key.clone(), refresh_token, features.clone()); + self.engine_cache + .entry(key.clone()) + .and_modify(|engine| { + if let Some(f) = self.features_cache.get(&key) { + let mut new_state = EngineState::default(); + let warnings = new_state.take_state(f.clone()); + if let Some(warnings) = warnings { + warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); + }; + *engine = new_state; + + } + }) + .or_insert_with(|| { + let mut new_state = EngineState::default(); + + let warnings = new_state.take_state(features); + if let Some(warnings) = warnings { + warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}"); + }; + new_state + }); + } + +>>>>>>> Stashed changes:server/src/http/feature_refresher.rs pub async fn refresh_single(&self, refresh: TokenRefresh) { let features_result = self .unleash_client @@ -531,6 +609,73 @@ impl FeatureRefresher { } } } + + pub async fn refresh_single_delta(&self, refresh: TokenRefresh) { + let features_result = self + .unleash_client + .get_client_features_delta(ClientFeaturesRequest { + api_key: refresh.token.token.clone(), + etag: refresh.etag, + }) + .await; + + match features_result { + Ok(feature_response) => match feature_response { + ClientFeaturesDeltaResponse::NoUpdate(tag) => { + debug!("No update needed. Will update last check time with {tag}"); + self.update_last_check(&refresh.token.clone()); + } + ClientFeaturesDeltaResponse::Updated(features, etag) => { + self.handle_client_features_updated(&refresh.token, features, etag) + .await + } + }, + Err(e) => { + match e { + EdgeError::ClientFeaturesFetchError(fe) => { + match fe { + FeatureError::Retriable(status_code) => match status_code { + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => { + info!("Upstream is having some problems, increasing my waiting period"); + self.backoff(&refresh.token); + } + StatusCode::TOO_MANY_REQUESTS => { + info!("Got told that upstream is receiving too many requests"); + self.backoff(&refresh.token); + } + _ => { + info!("Couldn't refresh features, but will retry next go") + } + }, + FeatureError::AccessDenied => { + info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks"); + self.tokens_to_refresh.remove(&refresh.token.token); + if !self.tokens_to_refresh.iter().any(|e| { + e.value().token.environment == refresh.token.environment + }) { + let cache_key = cache_key(&refresh.token); + // No tokens left that access the environment of our current refresh. Deleting client features and engine cache + self.features_cache.remove(&cache_key); + self.engine_cache.remove(&cache_key); + } + } + FeatureError::NotFound => { + info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again"); + self.backoff(&refresh.token); + } + } + } + EdgeError::ClientCacheError => { + info!("Couldn't refresh features, but will retry next go") + } + _ => info!("Couldn't refresh features: {e:?}. Will retry next pass"), + } + } + } + } pub fn backoff(&self, token: &EdgeToken) { self.tokens_to_refresh .alter(&token.token, |_k, old_refresh| { diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index 44349a58..ff1d2bd1 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -26,8 +26,12 @@ use crate::http::headers::{ use crate::metrics::client_metrics::MetricsBatch; use crate::tls::build_upstream_certificate; use crate::types::{ +<<<<<<< Updated upstream ClientFeaturesDeltaResponse, ClientFeaturesResponse, EdgeResult, EdgeToken, TokenValidationStatus, ValidateTokensRequest, +======= + ClientFeaturesResponse, ClientFeaturesDeltaResponse, EdgeResult, EdgeToken, TokenValidationStatus, ValidateTokensRequest, +>>>>>>> Stashed changes }; use crate::urls::UnleashUrls; use crate::{error::EdgeError, types::ClientFeaturesRequest}; @@ -48,7 +52,11 @@ lazy_static! { vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0] ) .unwrap(); +<<<<<<< Updated upstream pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!( +======= + pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!( +>>>>>>> Stashed changes "client_feature_delta_fetch", "Timings for fetching feature deltas in milliseconds", &["status_code"], diff --git a/server/src/urls.rs b/server/src/urls.rs index 12336c4c..a7f8ef8d 100644 --- a/server/src/urls.rs +++ b/server/src/urls.rs @@ -125,8 +125,13 @@ mod tests { #[test_case("https://app.unleash-hosted.com/demo", "https://app.unleash-hosted.com/demo/api", "https://app.unleash-hosted.com/demo/api/client", "https://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "No trailing slash, https protocol")] #[test_case("https://app.unleash-hosted.com/demo/", "https://app.unleash-hosted.com/demo/api", "https://app.unleash-hosted.com/demo/api/client", "https://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, https protocol")] +<<<<<<< Updated upstream #[test_case("http://app.unleash-hosted.com/demo/", "http://app.unleash-hosted.com/demo/api", "http://app.unleash-hosted.com/demo/api/client", "http://app.unleash-hosted.com/demo/api/client/features", "http://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, http protocol")] #[test_case("http://app.unleash-hosted.com/", "http://app.unleash-hosted.com/api", "http://app.unleash-hosted.com/api/client", "http://app.unleash-hosted.com/api/client/features", "http://app.unleash-hosted.com/api/client/delta" ; "One trailing slash, no subpath, http protocol")] +======= + #[test_case("http://app.unleash-hosted.com/demo/", "http://app.unleash-hosted.com/demo/api", "http://app.unleash-hosted.com/demo/api/client", "http://app.unleash-hosted.com/demo/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, http protocol")] + #[test_case("http://app.unleash-hosted.com/", "http://app.unleash-hosted.com/api", "http://app.unleash-hosted.com/api/client", "http://app.unleash-hosted.com/api/client/features", "https://app.unleash-hosted.com/demo/api/client/delta" ; "One trailing slash, no subpath, http protocol")] +>>>>>>> Stashed changes pub fn can_handle_base_urls( base_url: &str, api_url: &str,