diff --git a/server/src/filters.rs b/server/src/filters.rs index 51b78d4a..af33502a 100644 --- a/server/src/filters.rs +++ b/server/src/filters.rs @@ -60,19 +60,22 @@ pub(crate) fn name_match_filter(name_prefix: String) -> FeatureFilter { Box::new(move |f| f.name.starts_with(&name_prefix)) } -pub(crate) fn project_filter(token: &EdgeToken) -> FeatureFilter { - let token = token.clone(); +pub(crate) fn project_filter_from_projects(projects: Vec) -> FeatureFilter { Box::new(move |feature| { if let Some(feature_project) = &feature.project { - token.projects.is_empty() - || token.projects.contains(&"*".to_string()) - || token.projects.contains(feature_project) + projects.is_empty() + || projects.contains(&"*".to_string()) + || projects.contains(feature_project) } else { false } }) } +pub(crate) fn project_filter(token: &EdgeToken) -> FeatureFilter { + project_filter_from_projects(token.projects.clone()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/server/src/http/broadcaster.rs b/server/src/http/broadcaster.rs index a1d21b50..f15f4bbe 100644 --- a/server/src/http/broadcaster.rs +++ b/server/src/http/broadcaster.rs @@ -16,7 +16,9 @@ use unleash_types::client_features::{ClientFeatures, Query}; use crate::{ error::EdgeError, feature_cache::{FeatureCache, UpdateType}, - filters::{filter_client_features, name_prefix_filter, FeatureFilter, FeatureFilterSet}, + filters::{ + filter_client_features, name_prefix_filter, project_filter_from_projects, FeatureFilterSet, + }, types::{EdgeJsonResult, EdgeResult, EdgeToken}, }; @@ -124,20 +126,23 @@ impl Broadcaster { async fn heartbeat(&self) { let mut active_connections = 0i64; for mut group in self.active_connections.iter_mut() { - let clients = std::mem::take(&mut group.clients); - let ok_clients = &mut group.clients; + let mut ok_clients = Vec::new(); - for ClientData { token, sender } in clients { + for ClientData { token, sender } in &group.clients { if sender .send(sse::Event::Comment("keep-alive".into())) .await .is_ok() { - ok_clients.push(ClientData { token, sender }); + ok_clients.push(ClientData { + token: token.clone(), + sender: sender.clone(), + }); } } active_connections += ok_clients.len() as i64; + group.clients = ok_clients; } CONNECTED_STREAMING_CLIENTS.set(active_connections) } @@ -191,7 +196,7 @@ impl Broadcaster { } else { FeatureFilterSet::default() } - .with_filter(project_filter(query.projects.clone())); + .with_filter(project_filter_from_projects(query.projects.clone())); filter_set } @@ -257,18 +262,6 @@ impl Broadcaster { } } -fn project_filter(projects: Vec) -> FeatureFilter { - Box::new(move |feature| { - if let Some(feature_project) = &feature.project { - projects.is_empty() - || projects.contains(&"*".to_string()) - || projects.contains(feature_project) - } else { - false - } - }) -} - #[cfg(test)] mod test { use tokio::time::timeout;