diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 3d8ef9d0..a19c9367 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -1,3 +1,4 @@ +use crate::cli::{EdgeArgs, EdgeMode}; use crate::error::EdgeError; use crate::feature_cache::FeatureCache; use crate::filters::{ @@ -45,12 +46,22 @@ pub async fn stream_features( edge_token: EdgeToken, broadcaster: Data, token_cache: Data>, + edge_mode: Data, filter_query: Query, ) -> EdgeResult { - let (validated_token, _filter_set, query) = - get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; + match edge_mode.get_ref() { + EdgeMode::Edge(EdgeArgs { + streaming: true, .. + }) => { + let (validated_token, _filter_set, query) = + get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; - broadcaster.connect(validated_token, query).await + broadcaster.connect(validated_token, query).await + } + _ => Err(EdgeError::Forbidden( + "This endpoint is only enabled in streaming mode".into(), + )), + } } #[utoipa::path( @@ -299,7 +310,7 @@ mod tests { use crate::auth::token_validator::TokenValidator; use crate::cli::{OfflineArgs, TokenHeader}; - use crate::http::unleash_client::{UnleashClient, ClientMetaInformation}; + use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; use crate::middleware; use crate::tests::{features_from_disk, upstream_server}; use actix_http::{Request, StatusCode}; diff --git a/server/src/error.rs b/server/src/error.rs index 9fde19e0..7faa6ecb 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -103,6 +103,7 @@ pub enum EdgeError { EdgeTokenError, EdgeTokenParseError, FeatureNotFound(String), + Forbidden(String), FrontendExpectedToBeHydrated(String), FrontendNotYetHydrated(FrontendHydrationMissing), HealthCheckError(String), @@ -212,6 +213,7 @@ impl Display for EdgeError { } EdgeError::InvalidTokenWithStrictBehavior => write!(f, "Edge is running with strict behavior and the token is not subsumed by any registered tokens"), EdgeError::SseError(message) => write!(f, "{}", message), + EdgeError::Forbidden(reason) => write!(f, "{}", reason), } } } @@ -253,6 +255,7 @@ impl ResponseError for EdgeError { EdgeError::NotReady => StatusCode::SERVICE_UNAVAILABLE, EdgeError::InvalidTokenWithStrictBehavior => StatusCode::FORBIDDEN, EdgeError::SseError(_) => StatusCode::INTERNAL_SERVER_ERROR, + EdgeError::Forbidden(_) => StatusCode::FORBIDDEN, } } diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs index fd92feaa..8ca9e674 100644 --- a/server/tests/streaming_test.rs +++ b/server/tests/streaming_test.rs @@ -11,6 +11,7 @@ mod streaming_test { sync::Arc, }; use unleash_edge::{ + cli::{EdgeArgs, EdgeMode, TokenHeader}, feature_cache::FeatureCache, http::broadcaster::Broadcaster, tokens::cache_key, @@ -188,6 +189,36 @@ mod streaming_test { }); test_server(move || { + // the streaming endpoint doesn't work unless app data contains an EdgeMode::Edge with streaming: true + let edge_mode = EdgeMode::Edge(EdgeArgs { + streaming: true, + upstream_url: "".into(), + backup_folder: None, + metrics_interval_seconds: 60, + features_refresh_interval_seconds: 60, + token_revalidation_interval_seconds: 60, + tokens: vec!["".into()], + custom_client_headers: vec![], + skip_ssl_verification: false, + client_identity: None, + upstream_certificate_file: None, + upstream_request_timeout: 5, + upstream_socket_timeout: 5, + redis: None, + s3: None, + token_header: TokenHeader { + token_header: "".into(), + }, + strict: true, + dynamic: false, + delta: false, + prometheus_remote_write_url: None, + prometheus_push_interval: 60, + prometheus_username: None, + prometheus_password: None, + prometheus_user_id: None, + }); + let config = serde_qs::actix::QsQueryConfig::default() .qs_config(serde_qs::Config::new(5, false)); let metrics_cache = MetricsCache::default(); @@ -205,6 +236,7 @@ mod streaming_test { .app_data(web::Data::from(upstream_token_cache.clone())) .app_data(web::Data::new(metrics_cache)) .app_data(web::Data::new(connect_via)) + .app_data(web::Data::new(edge_mode)) .service( web::scope("/api") .configure(unleash_edge::client_api::configure_client_api)