Skip to content

Commit

Permalink
feat(1-3220): only push updates to listeners in the same environment (#…
Browse files Browse the repository at this point in the history
…620)

This PR updates the broadcaster to only push updates to listeners in the environment it got updates for. In other words, if you get an update for "development" config, you shouldn't push an update to listeners subscribed to the "production" environment.

This is the first step in more selective pushing (the next would be to do as we do in Unleash and hash the result of each query so that we only push out updates when something's actually changed).
  • Loading branch information
thomasheartman authored Jan 6, 2025
1 parent 7ecb244 commit 9de3014
Showing 1 changed file with 131 additions and 4 deletions.
135 changes: 131 additions & 4 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use unleash_types::client_features::{ClientFeatures, Query};

use crate::{
error::EdgeError,
feature_cache::FeatureCache,
feature_cache::{FeatureCache, UpdateType},
filters::{filter_client_features, name_prefix_filter, FeatureFilter, FeatureFilterSet},
types::{EdgeJsonResult, EdgeResult, EdgeToken},
};
Expand Down Expand Up @@ -108,7 +108,14 @@ impl Broadcaster {
tokio::spawn(async move {
while let Ok(key) = rx.recv().await {
debug!("Received update for key: {:?}", key);
this.broadcast().await;
match key {
UpdateType::Full(env) | UpdateType::Update(env) => {
this.broadcast(Some(env)).await;
}
UpdateType::Deletion => {
this.broadcast(None).await;
}
}
}
});
}
Expand Down Expand Up @@ -210,10 +217,16 @@ impl Broadcaster {
}

/// Broadcast new features to all clients.
pub async fn broadcast(&self) {
pub async fn broadcast(&self, environment: Option<String>) {
let mut client_events = Vec::new();

for entry in self.active_connections.iter() {
for entry in self.active_connections.iter().filter(|entry| {
if let Some(env) = &environment {
entry.key().environment == *env
} else {
true
}
}) {
let (query, group) = entry.pair();

let event_data = self
Expand Down Expand Up @@ -255,3 +268,117 @@ fn project_filter(projects: Vec<String>) -> FeatureFilter {
}
})
}

#[cfg(test)]
mod test {
use tokio::time::timeout;
use unleash_types::client_features::ClientFeature;

use crate::feature_cache::FeatureCache;

use super::*;

#[actix_web::test]
async fn only_updates_clients_in_same_env() {
let feature_cache = Arc::new(FeatureCache::default());
let broadcaster = Broadcaster::new(feature_cache.clone());

let env_with_updates = "production";
let env_without_updates = "development";
for env in &[env_with_updates, env_without_updates] {
feature_cache.insert(
env.to_string(),
ClientFeatures {
version: 0,
features: vec![],
query: None,
segments: None,
},
);
}

let mut rx = broadcaster
.create_connection(
StreamingQuery {
name_prefix: None,
environment: env_with_updates.into(),
projects: vec!["dx".to_string()],
},
"token",
)
.await
.expect("Failed to connect");

// Drain any initial events to start with a clean state
while let Ok(Some(_)) = timeout(Duration::from_secs(1), rx.recv()).await {
// ignored
}

feature_cache.insert(
env_with_updates.to_string(),
ClientFeatures {
version: 0,
features: vec![ClientFeature {
name: "flag-a".into(),
project: Some("dx".into()),
..Default::default()
}],
segments: None,
query: None,
},
);

if tokio::time::timeout(std::time::Duration::from_secs(2), async {
loop {
if let Some(event) = rx.recv().await {
match event {
Event::Data(_) => {
// the only kind of data events we send at the moment are unleash-updated events. So if we receive a data event, we've got the update.
break;
}
_ => {
// ignore other events
}
}
}
}
})
.await
.is_err()
{
panic!("Test timed out waiting for update event");
}

feature_cache.insert(
env_without_updates.to_string(),
ClientFeatures {
version: 0,
features: vec![ClientFeature {
name: "flag-b".into(),
project: Some("dx".into()),
..Default::default()
}],
segments: None,
query: None,
},
);

let result = tokio::time::timeout(std::time::Duration::from_secs(1), async {
loop {
if let Some(event) = rx.recv().await {
match event {
Event::Data(_) => {
panic!("Received an update for an env I'm not subscribed to!");
}
_ => {
// ignore other events
}
}
}
}
})
.await;

assert!(result.is_err());
}
}

0 comments on commit 9de3014

Please sign in to comment.