Skip to content

Commit

Permalink
feat(1-3220): remove feature code
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasheartman committed Jan 6, 2025
1 parent 4dd3e24 commit c26e69e
Showing 1 changed file with 4 additions and 131 deletions.
135 changes: 4 additions & 131 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use unleash_types::client_features::{ClientFeatures, Query};

use crate::{
error::EdgeError,
feature_cache::{FeatureCache, UpdateType},
feature_cache::FeatureCache,
filters::{filter_client_features, name_prefix_filter, FeatureFilter, FeatureFilterSet},
types::{EdgeJsonResult, EdgeResult, EdgeToken},
};
Expand Down Expand Up @@ -108,14 +108,7 @@ impl Broadcaster {
tokio::spawn(async move {
while let Ok(key) = rx.recv().await {
debug!("Received update for key: {:?}", key);
match key {
UpdateType::Full(env) | UpdateType::Update(env) => {
this.broadcast(Some(env)).await;
}
UpdateType::Deletion => {
this.broadcast(None).await;
}
}
this.broadcast().await;
}
});
}
Expand Down Expand Up @@ -217,16 +210,10 @@ impl Broadcaster {
}

/// Broadcast new features to all clients.
pub async fn broadcast(&self, environment: Option<String>) {
pub async fn broadcast(&self) {
let mut client_events = Vec::new();

for entry in self.active_connections.iter().filter(|entry| {
if let Some(env) = &environment {
entry.key().environment == *env
} else {
true
}
}) {
for entry in self.active_connections.iter() {
let (query, group) = entry.pair();

let event_data = self
Expand Down Expand Up @@ -268,117 +255,3 @@ fn project_filter(projects: Vec<String>) -> FeatureFilter {
}
})
}

#[cfg(test)]
mod test {
use tokio::time::timeout;
use unleash_types::client_features::ClientFeature;

use crate::feature_cache::FeatureCache;

use super::*;

#[actix_web::test]
async fn only_updates_clients_in_same_env() {
let feature_cache = Arc::new(FeatureCache::default());
let broadcaster = Broadcaster::new(feature_cache.clone());

let env_with_updates = "production";
let env_without_updates = "development";
for env in &[env_with_updates, env_without_updates] {
feature_cache.insert(
env.to_string(),
ClientFeatures {
version: 0,
features: vec![],
query: None,
segments: None,
},
);
}

let mut rx = broadcaster
.create_connection(
StreamingQuery {
name_prefix: None,
environment: env_with_updates.into(),
projects: vec!["dx".to_string()],
},
"token",
)
.await
.expect("Failed to connect");

// Drain any initial events to start with a clean state
while let Ok(Some(_)) = timeout(Duration::from_secs(1), rx.recv()).await {
// ignored
}

feature_cache.insert(
env_with_updates.to_string(),
ClientFeatures {
version: 0,
features: vec![ClientFeature {
name: "flag-a".into(),
project: Some("dx".into()),
..Default::default()
}],
segments: None,
query: None,
},
);

if tokio::time::timeout(std::time::Duration::from_secs(2), async {
loop {
if let Some(event) = rx.recv().await {
match event {
Event::Data(_) => {
// the only kind of data events we send at the moment are unleash-updated events. So if we receive a data event, we've got the update.
break;
}
_ => {
// ignore other events
}
}
}
}
})
.await
.is_err()
{
panic!("Test timed out waiting for update event");
}

feature_cache.insert(
env_without_updates.to_string(),
ClientFeatures {
version: 0,
features: vec![ClientFeature {
name: "flag-b".into(),
project: Some("dx".into()),
..Default::default()
}],
segments: None,
query: None,
},
);

let result = tokio::time::timeout(std::time::Duration::from_secs(1), async {
loop {
if let Some(event) = rx.recv().await {
match event {
Event::Data(_) => {
panic!("Received an update for an env I'm not subscribed to!");
}
_ => {
// ignore other events
}
}
}
}
})
.await;

assert!(result.is_err());
}
}

0 comments on commit c26e69e

Please sign in to comment.