diff --git a/.gitignore b/.gitignore index 6f561791..6e2440e7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ debug/ *.ipr *.iws .vscode +mutants.out .project .classpath diff --git a/Cargo.lock b/Cargo.lock index 9bed1d43..9487e8a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,9 +447,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.5" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "bitflags" @@ -2979,6 +2979,7 @@ dependencies = [ "ahash", "anyhow", "async-trait", + "base64", "chrono", "cidr", "clap", @@ -3008,6 +3009,7 @@ dependencies = [ "serde_json", "serde_qs", "shadow-rs", + "str-buf", "test-case", "testcontainers", "testcontainers-modules", @@ -3020,6 +3022,7 @@ dependencies = [ "unleash-yggdrasil", "utoipa", "utoipa-swagger-ui", + "xxhash-rust", ] [[package]] @@ -3503,9 +3506,9 @@ dependencies = [ [[package]] name = "xxhash-rust" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9828b178da53440fa9c766a3d2f73f7cf5d0ac1fe3980c1e5018d899fd19e07b" +checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61" [[package]] name = "zerocopy" diff --git a/server/Cargo.toml b/server/Cargo.toml index fe9fad19..557c144b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -30,6 +30,7 @@ ahash = "0.8.7" anyhow = "1.0.79" async-trait = "0.1.77" +base64 = "0.21.7" chrono = { version = "0.4.31", features = ["serde"] } cidr = "0.2.2" clap = { version = "4.4.16", features = ["derive", "env"] } @@ -70,6 +71,7 @@ serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" serde_qs = { version = "0.12.0", features = ["actix4", "tracing"] } shadow-rs = { version = "0.26.0" } +str-buf = "3.0.2" tokio = { version = "1.35.1", features = [ "macros", "rt-multi-thread", @@ -83,6 +85,7 @@ unleash-types = { version = "0.10", features = ["openapi", "hashes"] } unleash-yggdrasil = { version = "0.8.0" } utoipa = { version = "4.2.0", features = ["actix_extras", "chrono"] } utoipa-swagger-ui = { version = "6", features = ["actix-web"] } +xxhash-rust = { version = "0.8.8", features = ["xxh3"] } [dev-dependencies] actix-http = "3.5.1" actix-http-test = "3.1.0" diff --git a/server/src/builder.rs b/server/src/builder.rs index ca1c0688..05ae4ccd 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -1,3 +1,4 @@ +use actix_web::http::header::EntityTag; use chrono::Duration; use dashmap::DashMap; use reqwest::Url; @@ -14,6 +15,7 @@ use crate::{ auth::token_validator::TokenValidator, cli::{CliArgs, EdgeArgs, EdgeMode, OfflineArgs}, error::EdgeError, + filters, hashing, http::{feature_refresher::FeatureRefresher, unleash_client::UnleashClient}, types::{EdgeResult, EdgeToken, TokenType}, }; @@ -24,6 +26,7 @@ type CacheContainer = ( Arc>, Arc>, Arc>, + Arc>, ); type EdgeInfo = ( CacheContainer, @@ -36,10 +39,12 @@ fn build_caches() -> CacheContainer { let token_cache: DashMap = DashMap::default(); let features_cache: DashMap = DashMap::default(); let engine_cache: DashMap = DashMap::default(); + let etag_cache: DashMap = DashMap::default(); ( Arc::new(token_cache), Arc::new(features_cache), Arc::new(engine_cache), + Arc::new(etag_cache), ) } @@ -48,18 +53,20 @@ async fn hydrate_from_persistent_storage( feature_refresher: Arc, storage: Arc, ) { - let (token_cache, features_cache, engine_cache) = cache; + let (token_cache, features_cache, engine_cache, etag_cache) = cache; let tokens = storage.load_tokens().await.unwrap_or_default(); let features = storage.load_features().await.unwrap_or_default(); let refresh_targets = storage.load_refresh_targets().await.unwrap_or_default(); for token in tokens { tracing::debug!("Hydrating tokens {token:?}"); - token_cache.insert(token.token.clone(), token); + token_cache.insert(token.token.clone(), token.clone()); + etag_cache.insert(token.clone(), EntityTag::new_weak("".to_string())); } for (key, features) in features { tracing::debug!("Hydrating features for {key:?}"); features_cache.insert(key.clone(), features.clone()); + update_etags(etag_cache.clone(), &key, &features); let mut engine_state = EngineState::default(); engine_state.take_state(features); engine_cache.insert(key, engine_state); @@ -73,11 +80,25 @@ async fn hydrate_from_persistent_storage( } } +fn update_etags( + etag_cache: Arc>, + key: &str, + features: &ClientFeatures, +) { + let etag = crate::hashing::client_features_to_etag(features); + let token = EdgeToken::from_str(key).unwrap(); + etag_cache.iter_mut().for_each(|mut t| { + if token.subsumes(t.key()) || &token == t.key() { + *t.value_mut() = etag.clone(); + } + }) +} + pub(crate) fn build_offline_mode( client_features: ClientFeatures, tokens: Vec, ) -> EdgeResult { - let (token_cache, features_cache, engine_cache) = build_caches(); + let (token_cache, features_cache, engine_cache, etag_cache) = build_caches(); let edge_tokens: Vec = tokens .iter() @@ -93,8 +114,13 @@ pub(crate) fn build_offline_mode( engine_cache.clone(), client_features.clone(), ); + let filtered = filters::filter_features_for_token(&client_features, &edge_token); + etag_cache.insert( + edge_token.clone(), + hashing::client_features_to_etag(&filtered), + ); } - Ok((token_cache, features_cache, engine_cache)) + Ok((token_cache, features_cache, engine_cache, etag_cache)) } fn build_offline(offline_args: OfflineArgs) -> EdgeResult { @@ -131,7 +157,7 @@ async fn get_data_source(args: &EdgeArgs) -> Option> { } async fn build_edge(args: &EdgeArgs) -> EdgeResult { - let (token_cache, feature_cache, engine_cache) = build_caches(); + let (token_cache, feature_cache, engine_cache, etag_cache) = build_caches(); let persistence = get_data_source(args).await; @@ -162,6 +188,7 @@ async fn build_edge(args: &EdgeArgs) -> EdgeResult { unleash_client, feature_cache.clone(), engine_cache.clone(), + etag_cache.clone(), Duration::seconds(args.features_refresh_interval_seconds.try_into().unwrap()), persistence.clone(), )); @@ -173,6 +200,7 @@ async fn build_edge(args: &EdgeArgs) -> EdgeResult { token_cache.clone(), feature_cache.clone(), engine_cache.clone(), + etag_cache.clone(), ), feature_refresher.clone(), persistence, @@ -189,7 +217,7 @@ async fn build_edge(args: &EdgeArgs) -> EdgeResult { .await; } Ok(( - (token_cache, feature_cache, engine_cache), + (token_cache, feature_cache, engine_cache, etag_cache), Some(token_validator), Some(feature_refresher), persistence, @@ -205,3 +233,101 @@ pub async fn build_caches_and_refreshers(args: CliArgs) -> EdgeResult _ => unreachable!(), } } + +#[cfg(test)] +mod tests { + use unleash_types::client_features::{ClientFeature, Strategy}; + + use super::*; + + #[test] + fn building_an_initial_state_from_backup_also_adds_etags_for_features() { + let client_features = ClientFeatures { + version: 2, + features: vec![ + ClientFeature { + name: "first_feature".into(), + feature_type: Some("Release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: true, + stale: Some(false), + impression_data: Some(false), + project: Some("default".into()), + strategies: Some(vec![Strategy { + name: "gradualRollout".into(), + sort_order: Some(1), + segments: None, + constraints: None, + parameters: None, + variants: None, + }]), + variants: None, + dependencies: None, + }, + ClientFeature { + name: "second_feature".into(), + feature_type: Some("Release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: true, + stale: Some(false), + impression_data: Some(false), + project: Some("eg".into()), + strategies: Some(vec![Strategy { + name: "gradualRollout".into(), + sort_order: Some(1), + segments: None, + constraints: None, + parameters: None, + variants: None, + }]), + variants: None, + dependencies: None, + }, + ClientFeature { + name: "third_feature".into(), + feature_type: Some("Release".into()), + description: None, + created_at: None, + last_seen_at: None, + enabled: true, + stale: Some(false), + impression_data: Some(false), + project: Some("default".into()), + strategies: Some(vec![Strategy { + name: "gradualRollout".into(), + sort_order: Some(1), + segments: None, + constraints: None, + parameters: None, + variants: None, + }]), + variants: None, + dependencies: None, + }, + ], + segments: None, + query: None, + }; + let wildcard_token = EdgeToken::from_str("*:development.somerandomstring").unwrap(); + let subsumed_token = EdgeToken::from_str("eg:development.someotherrandomstring").unwrap(); + let (_, _, _, etag_cache) = build_offline_mode( + client_features, + vec![wildcard_token.token.clone(), subsumed_token.token.clone()], + ) + .unwrap(); + assert!(etag_cache.contains_key(&wildcard_token)); + assert!(etag_cache.contains_key(&subsumed_token)); + assert_eq!( + etag_cache.get(&wildcard_token).unwrap().value(), + &EntityTag::new_weak("248-ISJs9qZR-gpTrJaMsTeZ8g==".into()) + ); + assert_eq!( + etag_cache.get(&subsumed_token).unwrap().value(), + &EntityTag::new_weak("da-ufCoeTJdSw9a7U9Av9k1qQ==".into()) + ); + } +} diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 1196a18d..80a3349f 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -270,6 +270,7 @@ mod tests { use crate::middleware; use crate::tests::{features_from_disk, upstream_server}; use actix_http::Request; + use actix_web::http::header::EntityTag; use actix_web::{ http::header::ContentType, test, @@ -569,11 +570,14 @@ mod tests { let upstream_token_cache = Arc::new(DashMap::default()); let upstream_features_cache = Arc::new(DashMap::default()); let upstream_engine_cache = Arc::new(DashMap::default()); + let upstream_etag_cache = Arc::new(DashMap::default()); + upstream_token_cache.insert(token.token.clone(), token.clone()); let srv = upstream_server( upstream_token_cache, upstream_features_cache, upstream_engine_cache, + upstream_etag_cache, ) .await; let req = reqwest::Client::new(); @@ -605,11 +609,13 @@ mod tests { let upstream_token_cache = Arc::new(DashMap::default()); let upstream_features_cache = Arc::new(DashMap::default()); let upstream_engine_cache = Arc::new(DashMap::default()); + let upstream_etag_cache = Arc::new(DashMap::default()); upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); let srv = upstream_server( upstream_token_cache, upstream_features_cache, upstream_engine_cache, + upstream_etag_cache, ) .await; let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap(); @@ -870,10 +876,12 @@ mod tests { Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + upstream_etag_cache.clone(), ) .await; let upstream_features = features_from_disk("../examples/hostedexample.json"); @@ -889,11 +897,13 @@ mod tests { let features_cache: Arc> = Arc::new(DashMap::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); + let etag_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = Arc::new(FeatureRefresher { unleash_client: unleash_client.clone(), tokens_to_refresh: Arc::new(Default::default()), features_cache: features_cache.clone(), engine_cache: engine_cache.clone(), + etag_cache: etag_cache.clone(), refresh_interval: Duration::seconds(6000), persistence: None, }); @@ -993,10 +1003,13 @@ mod tests { Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); + let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + upstream_etag_cache.clone(), ) .await; let upstream_features = features_from_disk("../examples/hostedexample.json"); @@ -1013,10 +1026,13 @@ mod tests { let features_cache: Arc> = Arc::new(DashMap::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); + let feature_refresher = Arc::new(FeatureRefresher::new( unleash_client.clone(), features_cache.clone(), engine_cache.clone(), + etag_cache.clone(), Duration::seconds(6000), None, )); diff --git a/server/src/filters.rs b/server/src/filters.rs index 65b73513..02ef22c7 100644 --- a/server/src/filters.rs +++ b/server/src/filters.rs @@ -39,6 +39,20 @@ fn filter_features( .collect::>() } +pub fn filter_features_for_token(features: &ClientFeatures, token: &EdgeToken) -> ClientFeatures { + let f = FeatureFilterSet::from(project_filter(token)); + let filtered = features + .features + .iter() + .filter(|feature| f.apply(feature)) + .cloned() + .collect::>(); + ClientFeatures { + features: filtered, + ..features.clone() + } +} + pub(crate) fn filter_client_features( feature_cache: &Ref<'_, String, ClientFeatures>, filters: &FeatureFilterSet, diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index b55c7237..d02dfbe7 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -810,7 +810,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_post_requests_resolves_context_values_correctly() { - let (token_cache, features_cache, engine_cache) = build_offline_mode( + let (token_cache, features_cache, engine_cache, etag_cache) = build_offline_mode( client_features_with_constraint_requiring_user_id_of_seven(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -824,6 +824,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(features_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service(web::scope("/api/frontend").service(super::post_frontend_all_features)), ) .await; @@ -860,7 +861,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_get_requests_resolves_context_values_correctly() { - let (feature_cache, token_cache, engine_cache) = build_offline_mode( + let (feature_cache, token_cache, engine_cache, etag_cache) = build_offline_mode( client_features_with_constraint_requiring_user_id_of_seven(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -873,6 +874,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service(web::scope("/api/proxy").service(super::get_proxy_all_features)), ) .await; @@ -907,7 +909,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_get_requests_resolves_context_values_correctly_with_enabled_filter() { - let (token_cache, features_cache, engine_cache) = build_offline_mode( + let (token_cache, features_cache, engine_cache, etag_cache) = build_offline_mode( client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -921,6 +923,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(features_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service(web::scope("/api/proxy").service(super::get_enabled_proxy)), ) .await; @@ -984,13 +987,14 @@ mod tests { #[tokio::test] async fn when_running_in_offline_mode_with_proxy_key_should_not_filter_features() { let client_features = client_features_with_constraint_requiring_user_id_of_seven(); - let (token_cache, feature_cache, engine_cache) = + let (token_cache, feature_cache, engine_cache, etag_cache) = build_offline_mode(client_features.clone(), vec!["secret-123".to_string()]).unwrap(); let app = test::init_service( App::new() .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .app_data(Data::new(EdgeMode::Offline(OfflineArgs { bootstrap_file: None, tokens: vec!["secret-123".into()], @@ -1013,7 +1017,7 @@ mod tests { #[tokio::test] async fn frontend_api_filters_evaluated_toggles_to_tokens_access() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, engine_cache, etag_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], ) @@ -1023,6 +1027,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service(web::scope("/api/frontend").service(super::get_frontend_all_features)), ) .await; @@ -1101,7 +1106,7 @@ mod tests { #[tokio::test] async fn can_get_single_feature() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, engine_cache, etag_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], ) @@ -1111,6 +1116,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service( web::scope("/api").configure(|cfg| super::configure_frontend_api(cfg, false)), ), @@ -1130,7 +1136,7 @@ mod tests { #[tokio::test] async fn trying_to_evaluate_feature_you_do_not_have_access_to_will_give_not_found() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, engine_cache, etag_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], ) @@ -1140,6 +1146,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service( web::scope("/api").configure(|cfg| super::configure_frontend_api(cfg, false)), ), @@ -1161,7 +1168,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/with_custom_constraint.json"); let auth_key = "default:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, engine_cache, etag_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], ) @@ -1174,6 +1181,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service( web::scope("/api").configure(|cfg| super::configure_frontend_api(cfg, false)), ), @@ -1201,7 +1209,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/with_custom_constraint.json"); let auth_key = "default:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, engine_cache, etag_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], ) @@ -1216,6 +1224,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service( web::scope("/api").configure(|cfg| super::configure_frontend_api(cfg, false)), ), @@ -1246,7 +1255,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/ip_address_feature.json"); let auth_key = "gard:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, engine_cache, etag_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], ) @@ -1261,6 +1270,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service( web::scope("/api").configure(|cfg| super::configure_frontend_api(cfg, false)), ), @@ -1284,7 +1294,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/ip_address_feature.json"); let auth_key = "gard:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, engine_cache, etag_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], ) @@ -1294,6 +1304,7 @@ mod tests { .app_data(Data::from(token_cache)) .app_data(Data::from(feature_cache)) .app_data(Data::from(engine_cache)) + .app_data(Data::from(etag_cache)) .service( web::scope("/api").configure(|cfg| super::configure_frontend_api(cfg, true)), ), diff --git a/server/src/hashing.rs b/server/src/hashing.rs new file mode 100644 index 00000000..62f305c9 --- /dev/null +++ b/server/src/hashing.rs @@ -0,0 +1,30 @@ +use actix_http::header::{HeaderName, HeaderValue, TryIntoHeaderPair}; +use actix_web::http::header::{ETag, EntityTag}; +use base64::Engine; +use unleash_types::client_features::ClientFeatures; +use xxhash_rust::xxh3; + +use crate::error::EdgeError; + +pub fn bytes_to_etag(bytes: &[u8]) -> EntityTag { + let hash = xxh3::xxh3_128(bytes); + let base64 = base64::prelude::BASE64_URL_SAFE.encode(hash.to_le_bytes()); + let hash = format!("{:x}-{}", bytes.len(), base64); + EntityTag::new_weak(hash) +} + +pub fn bytes_to_etag_header(bytes: &[u8]) -> Result<(HeaderName, HeaderValue), EdgeError> { + let etag = bytes_to_etag(bytes); + entity_tag_to_etag_header(&etag) +} + +pub fn entity_tag_to_etag_header(etag: &EntityTag) -> Result<(HeaderName, HeaderValue), EdgeError> { + ETag(etag.clone()) + .try_into_pair() + .map_err(|_| EdgeError::EdgeTokenError) +} + +pub fn client_features_to_etag(features: &ClientFeatures) -> EntityTag { + let bytes = serde_json::to_vec(features).unwrap(); + bytes_to_etag(&bytes) +} diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 738a3967..14d11377 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use std::{sync::Arc, time::Duration}; -use actix_web::http::header::EntityTag; +use actix_web::http::header::{EntityTag, IfNoneMatch}; use chrono::Utc; use dashmap::DashMap; use reqwest::StatusCode; @@ -16,7 +16,8 @@ use unleash_yggdrasil::EngineState; use super::unleash_client::UnleashClient; use crate::error::{EdgeError, FeatureError}; -use crate::filters::{filter_client_features, FeatureFilterSet}; +use crate::filters::{filter_client_features, project_filter, FeatureFilterSet}; + use crate::types::{ build, ClientTokenRequest, ClientTokenResponse, EdgeResult, TokenType, TokenValidationStatus, }; @@ -102,6 +103,7 @@ pub struct FeatureRefresher { pub unleash_client: Arc, pub tokens_to_refresh: Arc>, pub features_cache: Arc>, + pub etag_cache: Arc>, pub engine_cache: Arc>, pub refresh_interval: chrono::Duration, pub persistence: Option>, @@ -115,6 +117,7 @@ impl Default for FeatureRefresher { tokens_to_refresh: Arc::new(DashMap::default()), features_cache: Default::default(), engine_cache: Default::default(), + etag_cache: Default::default(), persistence: None, } } @@ -138,6 +141,7 @@ impl FeatureRefresher { unleash_client: Arc, features: Arc>, engines: Arc>, + etag_cache: Arc>, features_refresh_interval: chrono::Duration, persistence: Option>, ) -> Self { @@ -146,6 +150,7 @@ impl FeatureRefresher { tokens_to_refresh: Arc::new(DashMap::default()), features_cache: features, engine_cache: engines, + etag_cache, refresh_interval: features_refresh_interval, persistence, } @@ -157,6 +162,7 @@ impl FeatureRefresher { tokens_to_refresh: Arc::new(Default::default()), features_cache: Arc::new(Default::default()), engine_cache: Arc::new(Default::default()), + etag_cache: Arc::new(Default::default()), refresh_interval: chrono::Duration::seconds(10), persistence: None, } @@ -268,6 +274,10 @@ impl FeatureRefresher { /// /// Registers a token for refresh, the token will be discarded if it can be subsumed by another previously registered token pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option) { + self.etag_cache.insert( + token.clone(), + etag.clone().unwrap_or(EntityTag::new_weak("".into())), + ); if !self.tokens_to_refresh.contains_key(&token.token) { let use_new_bulk_endpoint_for_metrics = self .unleash_client @@ -298,6 +308,23 @@ impl FeatureRefresher { } } + pub fn has_token_with_etag(&self, if_none_match: &IfNoneMatch) -> bool { + let etags = match if_none_match { + IfNoneMatch::Any => return true, + IfNoneMatch::Items(etags) => etags, + }; + etags + .iter() + .any(|etag| self.get_etags().iter().any(|e| e.weak_eq(etag))) + } + + pub fn get_etags(&self) -> Vec { + self.tokens_to_refresh + .iter() + .filter_map(|t| t.value().etag.clone()) + .collect() + } + pub async fn start_refresh_features_background_task(&self) { loop { tokio::select! { @@ -339,7 +366,6 @@ impl FeatureRefresher { ClientFeaturesResponse::Updated(features, etag) => { debug!("Got updated client features. Updating features with {etag:?}"); let key = cache_key(&refresh.token); - self.update_last_refresh(&refresh.token, etag); self.features_cache .entry(key.clone()) .and_modify(|existing_data| { @@ -348,6 +374,8 @@ impl FeatureRefresher { *existing_data = updated_data; }) .or_insert_with(|| features.clone()); + self.update_last_refresh(&refresh.token, etag.clone()); + self.engine_cache .entry(key.clone()) .and_modify(|engine| { @@ -426,8 +454,31 @@ impl FeatureRefresher { pub fn update_last_refresh(&self, token: &EdgeToken, etag: Option) { self.tokens_to_refresh .alter(&token.token, |_k, old_refresh| { - old_refresh.successful_refresh(&self.refresh_interval, etag) + old_refresh.successful_refresh(&self.refresh_interval, etag.clone()) }); + debug!("Updated last refresh with etag: {:?}", etag); + debug!("Have an etag cache of: {:?}", self.etag_cache.len()); + if let Some(_e) = etag { + for mut cached in self + .etag_cache + .iter_mut() + .filter(|etag| token.subsumes(etag.key())) + { + debug!("Edgetoken {:?} is subsumed by {:?}", cached.key(), token); + let feature_filter = + FeatureFilterSet::default().with_filter(project_filter(cached.key())); + let filtered_features = self + .get_features_by_filter(cached.key(), &feature_filter) + .unwrap(); + let etag = crate::hashing::client_features_to_etag(&filtered_features); + debug!( + "Updating etag cache for {:?} with {:?}", + cached.key(), + etag.clone() + ); + *cached.value_mut() = etag; + } + } } } @@ -482,12 +533,14 @@ mod tests { ); let features_cache = Arc::new(DashMap::default()); let engines_cache = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engines_cache, + etag_cache, duration, None, ); @@ -514,12 +567,14 @@ mod tests { ); let features_cache = Arc::new(DashMap::default()); let engines_cache = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engines_cache, + etag_cache, duration, None, ); @@ -550,11 +605,13 @@ mod tests { ); let features_cache = Arc::new(DashMap::default()); let engines_cache = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engines_cache, + etag_cache, duration, None, ); @@ -593,11 +650,13 @@ mod tests { ); let features_cache = Arc::new(DashMap::default()); let engines_cache = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engines_cache, + etag_cache, duration, None, ); @@ -645,11 +704,13 @@ mod tests { ); let features_cache = Arc::new(DashMap::default()); let engines_cache = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engines_cache, + etag_cache, duration, None, ); @@ -701,12 +762,13 @@ mod tests { ); let features_cache = Arc::new(DashMap::default()); let engines_cache = Arc::new(DashMap::default()); - + let etag_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engines_cache, + etag_cache, duration, None, ); @@ -742,12 +804,14 @@ mod tests { ); let features_cache = Arc::new(DashMap::default()); let engines_cache = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engines_cache, + etag_cache, duration, None, ); @@ -778,12 +842,14 @@ mod tests { ); let features_cache = Arc::new(DashMap::default()); let engines_cache = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let duration = Duration::seconds(5); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engines_cache, + etag_cache, duration, None, ); @@ -849,6 +915,7 @@ mod tests { upstream_token_cache: Arc>, upstream_features_cache: Arc>, upstream_engine_cache: Arc>, + upstream_etag_cache: Arc>, ) -> TestServer { test_server(move || { HttpService::new(map_config( @@ -856,6 +923,7 @@ mod tests { .app_data(web::Data::from(upstream_features_cache.clone())) .app_data(web::Data::from(upstream_engine_cache.clone())) .app_data(web::Data::from(upstream_token_cache.clone())) + .app_data(web::Data::from(upstream_etag_cache.clone())) .service(web::scope("/api").configure(crate::client_api::configure_client_api)), |_| AppConfig::default(), )) @@ -869,19 +937,24 @@ mod tests { Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache = Arc::new(DashMap::default()); let server = client_api_test_server( upstream_token_cache, upstream_features_cache, upstream_engine_cache, + upstream_etag_cache, ) .await; let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); let features_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); + let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engine_cache, + etag_cache, Duration::seconds(60), None, ); @@ -909,6 +982,8 @@ mod tests { Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(token_cache); + let upstream_etag_cache = Arc::new(DashMap::default()); + let example_features = features_from_disk("../examples/features.json"); let cache_key = cache_key(&token); let mut engine_state = EngineState::default(); @@ -919,15 +994,18 @@ mod tests { upstream_token_cache, upstream_features_cache, upstream_engine_cache, + upstream_etag_cache, ) .await; let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); let features_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let feature_refresher = FeatureRefresher::new( Arc::new(unleash_client), features_cache, engine_cache, + etag_cache, Duration::milliseconds(1), None, ); @@ -960,12 +1038,17 @@ mod tests { Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); let token_cache_to_modify = upstream_token_cache.clone(); let mut valid_token = EdgeToken::try_from("*:development.secret123".to_string()).unwrap(); valid_token.token_type = Some(TokenType::Client); valid_token.status = Validated; upstream_token_cache.insert(valid_token.token.clone(), valid_token.clone()); let example_features = features_from_disk("../examples/features.json"); + upstream_etag_cache.insert( + valid_token.clone(), + crate::hashing::client_features_to_etag(&example_features), + ); let cache_key = cache_key(&valid_token); let mut engine_state = EngineState::default(); engine_state.take_state(example_features.clone()); @@ -975,6 +1058,7 @@ mod tests { upstream_token_cache, upstream_features_cache, upstream_engine_cache, + upstream_etag_cache, ) .await; let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); @@ -1001,6 +1085,7 @@ mod tests { Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); let token_cache_to_modify = upstream_token_cache.clone(); let mut dx_token = EdgeToken::try_from("dx:development.secret123".to_string()).unwrap(); dx_token.token_type = Some(TokenType::Client); @@ -1020,6 +1105,7 @@ mod tests { upstream_token_cache, upstream_features_cache, upstream_engine_cache, + upstream_etag_cache, ) .await; let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); @@ -1050,6 +1136,7 @@ mod tests { Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); let mut dx_token = EdgeToken::try_from("dx:development.secret123".to_string()).unwrap(); dx_token.token_type = Some(TokenType::Client); dx_token.status = Validated; @@ -1068,6 +1155,7 @@ mod tests { upstream_token_cache, upstream_features_cache, upstream_engine_cache, + upstream_etag_cache, ) .await; let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); @@ -1106,6 +1194,7 @@ mod tests { Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); let mut dx_token = EdgeToken::from_str("dx:development.secret123").unwrap(); dx_token.token_type = Some(TokenType::Client); dx_token.status = Validated; @@ -1129,6 +1218,7 @@ mod tests { upstream_token_cache, upstream_features_cache, upstream_engine_cache, + upstream_etag_cache, ) .await; let unleash_client = UnleashClient::new(server.url("/").as_str(), None).unwrap(); @@ -1222,11 +1312,16 @@ mod tests { Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); let mut eg_token = EdgeToken::from_str("eg:development.devsecret").unwrap(); eg_token.token_type = Some(TokenType::Client); eg_token.status = Validated; upstream_token_cache.insert(eg_token.token.clone(), eg_token.clone()); let example_features = features_from_disk("../examples/hostedexample.json"); + upstream_etag_cache.insert( + eg_token.clone(), + crate::hashing::client_features_to_etag(&example_features), + ); let cache_key = cache_key(&eg_token); upstream_features_cache.insert(cache_key.clone(), example_features.clone()); let mut engine_state = EngineState::default(); @@ -1236,6 +1331,7 @@ mod tests { upstream_token_cache, upstream_features_cache.clone(), upstream_engine_cache, + upstream_etag_cache.clone(), ) .await; let features_cache: Arc> = Arc::new(DashMap::default()); @@ -1245,7 +1341,8 @@ mod tests { tokens_to_refresh: Arc::new(Default::default()), features_cache: features_cache.clone(), engine_cache: Arc::new(Default::default()), - refresh_interval: chrono::Duration::seconds(0), + etag_cache: Arc::new(Default::default()), + refresh_interval: Duration::seconds(0), persistence: None, }; @@ -1279,7 +1376,7 @@ mod tests { .collect(); dx_data.remove(0); let mut token = EdgeToken::from_str("[]:development.somesecret").unwrap(); - token.status = TokenValidationStatus::Validated; + token.status = Validated; token.projects = vec![String::from("dx")]; let updated = super::update_projects_from_feature_update(&token, &features, &dx_data); diff --git a/server/src/internal_backstage.rs b/server/src/internal_backstage.rs index a9c3b918..bbffe2b7 100644 --- a/server/src/internal_backstage.rs +++ b/server/src/internal_backstage.rs @@ -79,6 +79,7 @@ pub async fn tokens( token_validation_status, })) } + pub fn configure_internal_backstage( cfg: &mut web::ServiceConfig, metrics_handler: PrometheusMetricsHandler, @@ -104,7 +105,7 @@ mod tests { use crate::tokens::cache_key; use crate::types::{BuildInfo, EdgeToken, Status, TokenInfo, TokenType, TokenValidationStatus}; use actix_web::body::MessageBody; - use actix_web::http::header::ContentType; + use actix_web::http::header::{ContentType, EntityTag}; use actix_web::test; use actix_web::{web, App}; use chrono::Duration; @@ -201,6 +202,7 @@ mod tests { Arc::new(DashMap::default()), Arc::new(DashMap::default()), Arc::new(DashMap::default()), + Arc::new(DashMap::default()), ) .await; let unleash_client = @@ -240,10 +242,13 @@ mod tests { Arc::new(DashMap::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); + let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + upstream_etag_cache.clone(), ) .await; let upstream_features = crate::tests::features_from_disk("../examples/hostedexample.json"); @@ -255,15 +260,21 @@ mod tests { upstream_known_token.clone(), ); upstream_features_cache.insert(cache_key(&upstream_known_token), upstream_features.clone()); + upstream_etag_cache.insert( + upstream_known_token.clone(), + crate::hashing::client_features_to_etag(&upstream_features), + ); let unleash_client = Arc::new(UnleashClient::new(server.url("/").as_str(), None).unwrap()); let features_cache: Arc> = Arc::new(DashMap::default()); let token_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); + let etag_cache = Arc::new(DashMap::default()); let feature_refresher = Arc::new(FeatureRefresher { unleash_client: unleash_client.clone(), tokens_to_refresh: Arc::new(Default::default()), features_cache: features_cache.clone(), engine_cache: engine_cache.clone(), + etag_cache: etag_cache.clone(), refresh_interval: Duration::seconds(6000), persistence: None, }); @@ -279,6 +290,7 @@ mod tests { .app_data(web::Data::from(engine_cache.clone())) .app_data(web::Data::from(token_cache.clone())) .app_data(web::Data::from(feature_refresher.clone())) + .app_data(web::Data::from(etag_cache.clone())) .service(web::scope("/internal-backstage").service(super::tokens)) .service( web::scope("/api") diff --git a/server/src/lib.rs b/server/src/lib.rs index 07376787..3b8fd2ce 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -10,6 +10,7 @@ pub mod edge_api; pub mod error; pub mod filters; pub mod frontend_api; +pub mod hashing; pub mod health_checker; pub mod http; pub mod internal_backstage; @@ -39,6 +40,7 @@ mod tests { use actix_http_test::{test_server, TestServer}; use actix_service::map_config; use actix_web::dev::AppConfig; + use actix_web::http::header::EntityTag; use actix_web::{web, App}; use dashmap::DashMap; use unleash_types::client_features::ClientFeatures; @@ -60,6 +62,7 @@ mod tests { upstream_token_cache: Arc>, upstream_features_cache: Arc>, upstream_engine_cache: Arc>, + upstream_etag_cache: Arc>, ) -> TestServer { let token_validator = Arc::new(TokenValidator { unleash_client: Arc::new(Default::default()), @@ -84,6 +87,7 @@ mod tests { .app_data(web::Data::from(upstream_token_cache.clone())) .app_data(web::Data::new(metrics_cache)) .app_data(web::Data::new(connect_via)) + .app_data(web::Data::from(upstream_etag_cache.clone())) .service( web::scope("/api") .configure(crate::client_api::configure_client_api) diff --git a/server/src/main.rs b/server/src/main.rs index 36be1e2c..8f6ccc46 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use actix_cors::Cors; -use actix_middleware_etag::Etag; + use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; use clap::Parser; @@ -55,7 +55,7 @@ async fn main() -> Result<(), anyhow::Error> { instance_id: args.clone().instance_id, }; let ( - (token_cache, features_cache, engine_cache), + (token_cache, features_cache, engine_cache, etag_cache), token_validator, feature_refresher, persistence, @@ -90,7 +90,8 @@ async fn main() -> Result<(), anyhow::Error> { .app_data(web::Data::from(metrics_cache.clone())) .app_data(web::Data::from(token_cache.clone())) .app_data(web::Data::from(features_cache.clone())) - .app_data(web::Data::from(engine_cache.clone())); + .app_data(web::Data::from(engine_cache.clone())) + .app_data(web::Data::from(etag_cache.clone())); app = match token_validator.clone() { Some(v) => app.app_data(web::Data::from(v)), None => app, @@ -101,7 +102,9 @@ async fn main() -> Result<(), anyhow::Error> { }; app.service( web::scope(&base_path) - .wrap(Etag) + .wrap(unleash_edge::middleware::etag::EdgeETag { + etag_cache: etag_cache.clone(), + }) .wrap(actix_web::middleware::Compress::default()) .wrap(actix_web::middleware::NormalizePath::default()) .wrap(cors_middleware) diff --git a/server/src/middleware/client_token_from_frontend_token.rs b/server/src/middleware/client_token_from_frontend_token.rs index bbbfe6d5..3af77ec8 100644 --- a/server/src/middleware/client_token_from_frontend_token.rs +++ b/server/src/middleware/client_token_from_frontend_token.rs @@ -57,6 +57,7 @@ mod tests { use actix_http_test::{test_server, TestServer}; use actix_service::map_config; use actix_web::dev::AppConfig; + use actix_web::http::header::EntityTag; use actix_web::{web, App}; use chrono::Duration; use dashmap::DashMap; @@ -77,6 +78,7 @@ mod tests { local_token_cache: Arc>, local_features_cache: Arc>, local_engine_cache: Arc>, + local_etag_cache: Arc>, ) -> TestServer { let token_validator = Arc::new(TokenValidator { unleash_client: unleash_client.clone(), @@ -87,6 +89,7 @@ mod tests { unleash_client.clone(), local_features_cache.clone(), local_engine_cache.clone(), + local_etag_cache.clone(), Duration::seconds(5), None, )); @@ -102,6 +105,7 @@ mod tests { .app_data(web::Data::from(local_engine_cache.clone())) .app_data(web::Data::from(local_token_cache.clone())) .app_data(web::Data::from(feature_refresher.clone())) + .app_data(web::Data::from(local_etag_cache.clone())) .service( web::scope("/api") .configure(crate::client_api::configure_client_api) @@ -132,10 +136,12 @@ mod tests { let upstream_token_cache: Arc> = Arc::new(DashMap::default()); upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); let upstream_server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + upstream_etag_cache.clone(), ) .await; info!("Upstream server: {:?}", upstream_server.url("/")); @@ -154,11 +160,13 @@ mod tests { Arc::new(DashMap::default()); let local_token_cache: Arc> = Arc::new(DashMap::default()); let local_engine_cache: Arc> = Arc::new(DashMap::default()); + let local_etag_cache: Arc> = Arc::new(DashMap::default()); let local_server = local_server( arced_client.clone(), local_token_cache, local_features_cache, local_engine_cache, + local_etag_cache, ) .await; let client = reqwest::Client::default(); @@ -193,6 +201,11 @@ mod tests { upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); upstream_token_cache.insert(client_token.token.clone(), client_token.clone()); upstream_features_cache.insert(client_token.environment.clone().unwrap(), features.clone()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); + upstream_etag_cache.insert( + client_token.clone(), + crate::hashing::client_features_to_etag(&features), + ); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let mut engine = EngineState::default(); engine.take_state(features.clone()); @@ -201,6 +214,7 @@ mod tests { upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + upstream_etag_cache.clone(), ) .await; info!("Upstream server: {:?}", upstream_server.url("/")); @@ -219,11 +233,13 @@ mod tests { Arc::new(DashMap::default()); let local_token_cache: Arc> = Arc::new(DashMap::default()); let local_engine_cache: Arc> = Arc::new(DashMap::default()); + let local_etag_cache: Arc> = Arc::new(DashMap::default()); let local_server = local_server( arced_client.clone(), local_token_cache, local_features_cache, local_engine_cache, + local_etag_cache, ) .await; let client = reqwest::Client::default(); @@ -259,6 +275,8 @@ mod tests { frontend_token.token_type = Some(TokenType::Frontend); let upstream_features_cache: Arc> = Arc::new(DashMap::default()); + let upstream_etag_cache: Arc> = Arc::new(DashMap::default()); + let upstream_token_cache: Arc> = Arc::new(DashMap::default()); upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); @@ -266,6 +284,7 @@ mod tests { upstream_token_cache.clone(), upstream_features_cache.clone(), upstream_engine_cache.clone(), + upstream_etag_cache.clone(), ) .await; let unleash_client = UnleashClient::from_url( @@ -281,12 +300,14 @@ mod tests { Arc::new(DashMap::default()); let local_token_cache: Arc> = Arc::new(DashMap::default()); let local_engine_cache: Arc> = Arc::new(DashMap::default()); + let local_etag_cache: Arc> = Arc::new(DashMap::default()); let local_server = local_server( Arc::new(unleash_client), local_token_cache, local_features_cache, local_engine_cache, + local_etag_cache, ) .await; let client = reqwest::Client::default(); diff --git a/server/src/middleware/etag.rs b/server/src/middleware/etag.rs new file mode 100644 index 00000000..6abd9dcd --- /dev/null +++ b/server/src/middleware/etag.rs @@ -0,0 +1,155 @@ +use std::pin::Pin; +use std::str::FromStr; +use std::sync::Arc; + +use actix_http::header::HeaderValue; +use actix_service::{Service, Transform}; +use actix_web::body::{BodySize, BoxBody, EitherBody, MessageBody, None as BodyNone}; +use actix_web::dev::{ServiceRequest, ServiceResponse}; +use actix_web::http::header::{ETag, EntityTag, IfNoneMatch, TryIntoHeaderPair}; +use actix_web::http::Method; +use actix_web::web::Bytes; +use actix_web::{HttpMessage, HttpResponse}; +use base64::Engine; +use core::fmt::Write; +use dashmap::DashMap; +use futures::{ + future::{ok, Ready}, + Future, +}; +use xxhash_rust::xxh3::xxh3_128; + +use crate::types::EdgeToken; + +#[derive(Default, Clone)] +pub struct EdgeETag { + pub etag_cache: Arc>, +} + +impl Transform for EdgeETag +where + S: Service, Error = actix_web::Error>, + S::Future: 'static, + B: MessageBody + 'static, +{ + type Response = ServiceResponse>; + type Error = actix_web::Error; + type Transform = EdgeETagMiddleware; + type InitError = (); + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ok(EdgeETagMiddleware { + service: service, + etag_cache: self.etag_cache.clone(), + }) + } +} +type Buffer = str_buf::StrBuf<62>; + +pub struct EdgeETagMiddleware { + service: S, + etag_cache: Arc>, +} + +impl Service for EdgeETagMiddleware +where + S: Service, Error = actix_web::Error>, + S::Future: 'static, + B: MessageBody + 'static, +{ + type Response = ServiceResponse>; + type Error = actix_web::Error; + type Future = + Pin>, Self::Error>>>>; + + actix_service::forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + let request_etag_header: Option = req.get_header(); + let method = req.method().clone(); + let auth_header = header_to_edgetoken(req.headers().get("Authorization")); + if we_know_this_etag_from_upstream( + self.etag_cache.clone(), + &auth_header, + &request_etag_header, + ) { + Box::pin(async move { + return Ok(ServiceResponse::new( + req.request().clone(), + HttpResponse::NotModified().body(BodyNone::new()), + ) + .into_response(HttpResponse::NotModified().body(BodyNone::new())) + .map_into_right_body()); + }) + } else { + let fut = self.service.call(req); + Box::pin(async move { + let res: ServiceResponse = fut.await?; + match method { + Method::GET => { + let mut modified = true; + let mut payload: Option = None; + let mut res = res.map_body(|_h, body| match body.size() { + BodySize::Sized(_size) => { + let bytes = body.try_into_bytes().unwrap_or_else(|_| Bytes::new()); + payload = Some(bytes.clone()); + + bytes.clone().boxed() + } + _ => body.boxed(), + }); + if let Some(bytes) = payload { + let response_hash = xxh3_128(&bytes); + let base64 = base64::prelude::BASE64_URL_SAFE + .encode(response_hash.to_le_bytes()); + let mut buff = Buffer::new(); + let _ = write!(buff, "{:x}-{}", bytes.len(), base64); + let tag = EntityTag::new_weak(buff.to_string()); + if let Some(request_etag_header) = request_etag_header { + if request_etag_header == IfNoneMatch::Any + || request_etag_header.to_string() == tag.to_string() + { + modified = false + } + } + if modified { + if let Ok((name, value)) = ETag(tag.clone()).try_into_pair() { + res.headers_mut().insert(name, value); + } + } + } + + Ok(match modified { + false => res + .into_response(HttpResponse::NotModified().body(BodyNone::new())) + .map_into_right_body(), + true => res.map_into_left_body(), + }) + } + _ => Ok(res.map_into_boxed_body().map_into_left_body()), + } + }) + } + } +} + +fn header_to_edgetoken(header: Option<&HeaderValue>) -> Option { + header + .map(|h| h.to_str().unwrap()) + .and_then(|header_str| EdgeToken::from_str(header_str).ok()) +} + +fn we_know_this_etag_from_upstream( + etag_cache: Arc>, + client_token: &Option, + if_none_match: &Option, +) -> bool { + if let (Some(if_none), Some(token)) = (if_none_match, client_token) { + etag_cache.get(token).map_or(false, |etag| { + if_none == &IfNoneMatch::Any || if_none.to_string() == etag.to_string() + }) + } else { + false + } +} diff --git a/server/src/middleware/mod.rs b/server/src/middleware/mod.rs index 61b7b45f..6a844083 100644 --- a/server/src/middleware/mod.rs +++ b/server/src/middleware/mod.rs @@ -8,3 +8,5 @@ pub mod validate_token; pub mod client_token_from_frontend_token; pub mod enrich_with_client_ip; + +pub mod etag; diff --git a/server/src/offline/offline_hotload.rs b/server/src/offline/offline_hotload.rs index 2047dfe5..7cf5236c 100644 --- a/server/src/offline/offline_hotload.rs +++ b/server/src/offline/offline_hotload.rs @@ -62,6 +62,7 @@ pub(crate) fn load_offline_engine_cache( crate::tokens::cache_key(edge_token), client_features.clone(), ); + let mut engine_state = EngineState::default(); engine_state.take_state(client_features); engine_cache.insert(crate::tokens::cache_key(edge_token), engine_state);