Skip to content

Commit

Permalink
Add flag
Browse files Browse the repository at this point in the history
  • Loading branch information
sjaanus committed Jan 10, 2025
1 parent cbdb521 commit a9e6c99
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 2 deletions.
19 changes: 17 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,19 @@ tokio = { version = "1.42.0", features = [
"fs",
] }
tokio-stream = { version = "0.1.17" }
<<<<<<< Updated upstream
tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] }
ulid = "1.1.3"
unleash-types = { version = "0.15.3", features = ["openapi", "hashes"] }
unleash-yggdrasil = { version = "0.14.5" }
=======
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] }
ulid = "1.1.2"
unleash-types = { version = "0.15.2", features = ["openapi", "hashes"] }
unleash-yggdrasil = { version = "0.14.1" }
>>>>>>> Stashed changes
utoipa = { version = "5", features = ["actix_extras", "chrono"] }
utoipa-swagger-ui = { version = "8", features = ["actix-web"] }
[dev-dependencies]
Expand Down
61 changes: 61 additions & 0 deletions server/src/feature_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,15 @@ impl FeatureCache {
self.send_full_update(key);
}

<<<<<<< Updated upstream
pub fn apply_delta(&self, key: String, delta: &ClientFeaturesDelta) {
let client_features = ClientFeatures {
version : 2,
=======
pub fn apply_delta(&self, key: String, token: &EdgeToken, delta: ClientFeaturesDelta) {
let client_features = ClientFeatures {
version : 1,
>>>>>>> Stashed changes
features : delta.updated.clone(),
segments: delta.segments.clone(),
query: None,
Expand All @@ -78,10 +84,35 @@ impl FeatureCache {
self.features
.entry(key.clone())
.and_modify(|existing_features| {
<<<<<<< Updated upstream
existing_features.modify_in_place(delta);
})
.or_insert(client_features);
self.send_full_update(key);
=======
let updated = update_client_features_delta(token, existing_features, &delta); // TODO: is this replacing or merging the flags
*existing_features = updated;
})
.or_insert(client_features);
self.send_full_update(key);

// let mut current_state = self.compiled_state.take().unwrap_or_default();
// let segment_map = build_segment_map(&delta.segments);
// let mut warnings: Vec<EvalWarning> = vec![];
// for removed in delta.removed.clone() {
// current_state.remove(&removed);
// }
// for update in delta.updated.clone() {
// let updated_state = compile(&update, &segment_map, &mut warnings);
// current_state.insert(update.name.clone(), updated_state);
// }
// self.compiled_state = Some(current_state);
// if warnings.is_empty() {
// None
// } else {
// Some(warnings)
// }
>>>>>>> Stashed changes
}

pub fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -116,7 +147,37 @@ fn update_client_features(
s
}),
query: old.query.clone().or(update.query.clone()),
<<<<<<< Updated upstream
meta: old.meta.clone().or(update.meta.clone()),
=======
meta: None,
}
}

fn update_client_features_delta(
token: &EdgeToken,
old: &ClientFeatures,
delta: &ClientFeaturesDelta,
) -> ClientFeatures {
let mut updated_features =
update_projects_from_feature_update(token, &old.features, &delta.updated);

for removed_feature in &delta.removed {
updated_features.retain(|feature| feature.name != *removed_feature);
}
updated_features.sort();

let segments = merge_segments_update(old.segments.clone(), delta.segments.clone());
ClientFeatures {
version: 1,
features: updated_features,
segments: segments.map(|mut s| {
s.sort();
s
}),
query: None,
meta: None,
>>>>>>> Stashed changes
}
}

Expand Down
145 changes: 145 additions & 0 deletions server/src/http/refresher/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use eventsource_client::Client;
use futures::TryStreamExt;
use reqwest::StatusCode;
use tracing::{debug, info, warn};
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
use unleash_types::client_features::{ClientFeatures};
=======
use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta};
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
use unleash_types::client_metrics::{ClientApplication, MetricsMetadata};
use unleash_yggdrasil::EngineState;

Expand All @@ -18,9 +22,13 @@ use crate::filters::{filter_client_features, FeatureFilterSet};
use crate::http::headers::{
UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER,
};
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
use crate::types::{
build, EdgeResult, TokenType, TokenValidationStatus,
};
=======
use crate::types::{build, ClientFeaturesDeltaResponse, EdgeResult, TokenType, TokenValidationStatus};
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
use crate::{
persistence::EdgePersistence,
tokens::{cache_key, simplify},
Expand Down Expand Up @@ -50,8 +58,13 @@ pub struct FeatureRefresher {
pub persistence: Option<Arc<dyn EdgePersistence>>,
pub strict: bool,
pub streaming: bool,
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
pub client_meta_information: ClientMetaInformation,
pub delta: bool,
=======
pub delta: bool,
pub app_name: String,
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
}

impl Default for FeatureRefresher {
Expand All @@ -65,8 +78,13 @@ impl Default for FeatureRefresher {
persistence: None,
strict: true,
streaming: false,
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
client_meta_information: Default::default(),
delta: false,
=======
delta: false,
app_name: "unleash_edge".into(),
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
}
}
}
Expand Down Expand Up @@ -103,8 +121,13 @@ pub enum FeatureRefresherMode {
pub struct FeatureRefreshConfig {
features_refresh_interval: chrono::Duration,
mode: FeatureRefresherMode,
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
client_meta_information: ClientMetaInformation,
delta: bool,
=======
delta: bool,
app_name: String,
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
}

impl FeatureRefreshConfig {
Expand Down Expand Up @@ -140,8 +163,13 @@ impl FeatureRefresher {
persistence,
strict: config.mode != FeatureRefresherMode::Dynamic,
streaming: config.mode == FeatureRefresherMode::Streaming,
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
client_meta_information: config.client_meta_information,
delta: config.delta,
=======
delta : config.delta,
app_name: config.app_name,
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
}
}

Expand Down Expand Up @@ -412,7 +440,11 @@ impl FeatureRefresher {
pub async fn hydrate_new_tokens(&self) {
let hydrations = self.get_tokens_never_refreshed();
for hydration in hydrations {
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
if cfg!(feature = "delta") && self.delta {
=======
if self.delta {
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
self.refresh_single_delta(hydration).await;
} else {
self.refresh_single(hydration).await;
Expand All @@ -422,12 +454,19 @@ impl FeatureRefresher {
pub async fn refresh_features(&self) {
let refreshes = self.get_tokens_due_for_refresh();
for refresh in refreshes {
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
if cfg!(feature = "delta") && self.delta {
=======
if self.delta {
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
self.refresh_single_delta(refresh).await;
} else {
self.refresh_single(refresh).await;
}
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs

=======
>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
}
}

Expand Down Expand Up @@ -465,6 +504,45 @@ impl FeatureRefresher {
new_state
});
}
<<<<<<< Updated upstream:server/src/http/refresher/feature_refresher.rs
=======

async fn handle_client_features_delta_updated(
&self,
refresh_token: &EdgeToken,
features: ClientFeaturesDelta,
etag: Option<EntityTag>,
) {
debug!("Got updated client features delta. Updating features with {etag:?}");
let key = cache_key(refresh_token);
self.update_last_refresh(refresh_token, etag, features.updated.len()); /// TODO: why we need to set updated here
self.features_cache
.modify(key.clone(), refresh_token, features.clone());
self.engine_cache
.entry(key.clone())
.and_modify(|engine| {
if let Some(f) = self.features_cache.get(&key) {
let mut new_state = EngineState::default();
let warnings = new_state.take_state(f.clone());
if let Some(warnings) = warnings {
warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
};
*engine = new_state;

}
})
.or_insert_with(|| {
let mut new_state = EngineState::default();

let warnings = new_state.take_state(features);
if let Some(warnings) = warnings {
warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
};
new_state
});
}

>>>>>>> Stashed changes:server/src/http/feature_refresher.rs
pub async fn refresh_single(&self, refresh: TokenRefresh) {
let features_result = self
.unleash_client
Expand Down Expand Up @@ -531,6 +609,73 @@ impl FeatureRefresher {
}
}
}

pub async fn refresh_single_delta(&self, refresh: TokenRefresh) {
let features_result = self
.unleash_client
.get_client_features_delta(ClientFeaturesRequest {
api_key: refresh.token.token.clone(),
etag: refresh.etag,
})
.await;

match features_result {
Ok(feature_response) => match feature_response {
ClientFeaturesDeltaResponse::NoUpdate(tag) => {
debug!("No update needed. Will update last check time with {tag}");
self.update_last_check(&refresh.token.clone());
}
ClientFeaturesDeltaResponse::Updated(features, etag) => {
self.handle_client_features_updated(&refresh.token, features, etag)
.await
}
},
Err(e) => {
match e {
EdgeError::ClientFeaturesFetchError(fe) => {
match fe {
FeatureError::Retriable(status_code) => match status_code {
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
info!("Upstream is having some problems, increasing my waiting period");
self.backoff(&refresh.token);
}
StatusCode::TOO_MANY_REQUESTS => {
info!("Got told that upstream is receiving too many requests");
self.backoff(&refresh.token);
}
_ => {
info!("Couldn't refresh features, but will retry next go")
}
},
FeatureError::AccessDenied => {
info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
}) {
let cache_key = cache_key(&refresh.token);
// No tokens left that access the environment of our current refresh. Deleting client features and engine cache
self.features_cache.remove(&cache_key);
self.engine_cache.remove(&cache_key);
}
}
FeatureError::NotFound => {
info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again");
self.backoff(&refresh.token);
}
}
}
EdgeError::ClientCacheError => {
info!("Couldn't refresh features, but will retry next go")
}
_ => info!("Couldn't refresh features: {e:?}. Will retry next pass"),
}
}
}
}
pub fn backoff(&self, token: &EdgeToken) {
self.tokens_to_refresh
.alter(&token.token, |_k, old_refresh| {
Expand Down
Loading

0 comments on commit a9e6c99

Please sign in to comment.