Skip to content

Commit

Permalink
feat: added client authenticated bulk metrics (#373)
Browse files Browse the repository at this point in the history
* filters metrics posted by environment accessible by token
  • Loading branch information
Christopher Kolstad authored Jan 10, 2024
1 parent 5a3db0e commit 1edda8c
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 19 deletions.
2 changes: 1 addition & 1 deletion server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async fn build_edge(args: &EdgeArgs) -> EdgeResult<EdgeInfo> {
args.upstream_certificate_file.clone(),
Duration::seconds(args.upstream_request_timeout),
Duration::seconds(args.upstream_socket_timeout),
args.token_header.token_header.clone()
args.token_header.token_header.clone(),
)
})
.map(|c| c.with_custom_client_headers(args.custom_client_headers.clone()))
Expand Down
1 change: 0 additions & 1 deletion server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ pub struct TokenHeader {
/// Token header to use for edge authorization.
#[clap(long, env, global = true, default_value = "Authorization")]
pub token_header: String,

}

impl FromStr for TokenHeader {
Expand Down
149 changes: 141 additions & 8 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::filters::{
use crate::http::feature_refresher::FeatureRefresher;
use crate::metrics::client_metrics::MetricsCache;
use crate::tokens::cache_key;
use crate::types::{self, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters};
use crate::types::{
self, BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters,
};
use actix_web::web::{self, Data, Json, Query};
use actix_web::{get, post, HttpRequest, HttpResponse};
use dashmap::DashMap;
Expand Down Expand Up @@ -201,6 +203,32 @@ pub async fn metrics(
Ok(HttpResponse::Accepted().finish())
}

#[utoipa::path(
context_path = "/api/client",
responses(
(status = 202, description = "Accepted bulk metrics"),
(status = 403, description = "Was not allowed to post bulk metrics")
),
request_body = BatchMetricsRequestBody,
security(
("Authorization" = [])
)
)]
#[post("/metrics/bulk")]
pub async fn post_bulk_metrics(
edge_token: EdgeToken,
bulk_metrics: Json<BatchMetricsRequestBody>,
connect_via: Data<ConnectVia>,
metrics_cache: Data<MetricsCache>,
) -> EdgeResult<HttpResponse> {
crate::metrics::client_metrics::register_bulk_metrics(
metrics_cache.get_ref(),
connect_via.get_ref(),
&edge_token,
bulk_metrics.into_inner(),
);
Ok(HttpResponse::Accepted().finish())
}
pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/client")
Expand All @@ -210,7 +238,8 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
.service(get_features)
.service(get_feature)
.service(register)
.service(metrics),
.service(metrics)
.service(post_bulk_metrics),
);
}

Expand All @@ -226,12 +255,12 @@ pub fn configure_experimental_post_features(
#[cfg(test)]
mod tests {

use crate::metrics::client_metrics::{ApplicationKey, MetricsBatch, MetricsKey};
use crate::types::{TokenType, TokenValidationStatus};
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

use crate::metrics::client_metrics::{ApplicationKey, MetricsKey};
use crate::types::{TokenType, TokenValidationStatus};
use std::sync::Arc;

use super::*;

Expand All @@ -245,14 +274,16 @@ mod tests {
http::header::ContentType,
test,
web::{self, Data},
App,
App, ResponseError,
};
use chrono::{DateTime, Duration, TimeZone, Utc};
use maplit::hashmap;
use reqwest::StatusCode;
use ulid::Ulid;
use unleash_types::client_features::{ClientFeature, Constraint, Operator, Strategy};
use unleash_types::client_metrics::{ClientMetricsEnv, MetricBucket, ToggleStats};
use unleash_types::client_metrics::{
ClientMetricsEnv, ConnectViaBuilder, MetricBucket, ToggleStats,
};
use unleash_yggdrasil::EngineState;

async fn make_metrics_post_request() -> Request {
Expand Down Expand Up @@ -282,6 +313,38 @@ mod tests {
.to_request()
}

async fn make_bulk_metrics_post_request(authorization: Option<String>) -> Request {
let mut req = test::TestRequest::post()
.uri("/api/client/metrics/bulk")
.insert_header(ContentType::json());
req = match authorization {
Some(auth) => req.insert_header(("Authorization", auth)),
None => req,
};
req.set_json(Json(BatchMetricsRequestBody {
applications: vec![ClientApplication {
app_name: "test_app".to_string(),
connect_via: None,
environment: None,
instance_id: None,
interval: 10,
sdk_version: None,
started: Default::default(),
strategies: vec![],
}],
metrics: vec![ClientMetricsEnv {
feature_name: "".to_string(),
app_name: "".to_string(),
environment: "".to_string(),
timestamp: Default::default(),
yes: 0,
no: 0,
variants: Default::default(),
}],
}))
.to_request()
}

async fn make_register_post_request(application: ClientApplication) -> Request {
test::TestRequest::post()
.uri("/api/client/register")
Expand Down Expand Up @@ -478,6 +541,76 @@ mod tests {
assert_eq!(saved_app.connect_via, Some(vec![our_app]));
}

#[tokio::test]
async fn bulk_metrics_endpoint_correctly_accepts_data() {
let metrics_cache = MetricsCache::default();
let connect_via = ConnectViaBuilder::default()
.app_name("unleash-edge".into())
.instance_id("test".into())
.build()
.unwrap();
let app = test::init_service(
App::new()
.app_data(Data::new(connect_via))
.app_data(web::Data::new(metrics_cache))
.service(web::scope("/api/client").service(post_bulk_metrics)),
)
.await;
let token = EdgeToken::from_str("*:development.somestring").unwrap();
let req = make_bulk_metrics_post_request(Some(token.token.clone())).await;
let call = test::call_service(&app, req).await;
assert_eq!(call.status(), StatusCode::ACCEPTED);
}
#[tokio::test]
async fn bulk_metrics_endpoint_correctly_refuses_metrics_without_auth_header() {
let mut token = EdgeToken::from_str("*:development.somestring").unwrap();
token.status = TokenValidationStatus::Validated;
token.token_type = Some(TokenType::Client);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(token.token.clone(), token.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
)
.await;
let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
let status = client
.send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), None)
.await;
assert_eq!(status.expect_err("").status_code(), StatusCode::FORBIDDEN);
let successful = client
.send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), Some(token.clone()))
.await;
assert!(successful.is_ok());
}

#[tokio::test]
async fn bulk_metrics_endpoint_correctly_refuses_metrics_with_frontend_token() {
let mut frontend_token = EdgeToken::from_str("*:development.frontend").unwrap();
frontend_token.status = TokenValidationStatus::Validated;
frontend_token.token_type = Some(TokenType::Frontend);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_engine_cache,
)
.await;
let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
let status = client
.send_bulk_metrics_to_client_endpoint(
MetricsBatch::default(),
Some(frontend_token.clone()),
)
.await;
assert_eq!(status.expect_err("").status_code(), StatusCode::FORBIDDEN);
}
#[tokio::test]
async fn register_endpoint_returns_version_header() {
let metrics_cache = Arc::new(MetricsCache::default());
Expand Down
9 changes: 8 additions & 1 deletion server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,14 @@ mod tests {
server.stop().await;
tokio::time::sleep(std::time::Duration::from_millis(5)).await; // To ensure our refresh is due
feature_refresher.refresh_features().await;
assert_eq!(feature_refresher.tokens_to_refresh.get("*:development.secret123").unwrap().failure_count, 1);
assert_eq!(
feature_refresher
.tokens_to_refresh
.get("*:development.secret123")
.unwrap()
.failure_count,
1
);
assert!(!feature_refresher.features_cache.is_empty());
assert!(!feature_refresher.engine_cache.is_empty());
}
Expand Down
33 changes: 31 additions & 2 deletions server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl UnleashClient {
service_account_token: String,
connect_timeout: Duration,
socket_timeout: Duration,
token_header: String
token_header: String,
) -> Self {
Self {
urls: UnleashUrls::from_base_url(server_url),
Expand Down Expand Up @@ -300,7 +300,7 @@ impl UnleashClient {

fn header_map(&self, api_key: Option<String>) -> HeaderMap {
let mut header_map = HeaderMap::new();
let token_header: HeaderName= HeaderName::from_str(self.token_header.as_str()).unwrap();
let token_header: HeaderName = HeaderName::from_str(self.token_header.as_str()).unwrap();
if let Some(key) = api_key {
header_map.insert(token_header, key.parse().unwrap());
}
Expand Down Expand Up @@ -450,6 +450,35 @@ impl UnleashClient {
}
}

pub async fn send_bulk_metrics_to_client_endpoint(
&self,
request: MetricsBatch,
token: Option<EdgeToken>,
) -> EdgeResult<()> {
let result = self
.backing_client
.post(self.urls.client_bulk_metrics_url.to_string())
.headers(self.header_map(token.map(|t| t.token)))
.json(&request)
.send()
.await
.map_err(|e| {
info!("Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}");
EdgeError::EdgeMetricsError
})?;
if result.status().is_success() {
Ok(())
} else {
match result.status() {
StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
result.status(),
result.json().await.ok(),
)),
_ => Err(EdgeMetricsRequestError(result.status(), None)),
}
}
}

pub async fn forward_request_for_client_token(
&self,
client_token_request: ClientTokenRequest,
Expand Down
10 changes: 9 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ mod tests {
use actix_web::{web, App};
use dashmap::DashMap;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ConnectVia;
use unleash_yggdrasil::EngineState;

use crate::auth::token_validator::TokenValidator;
use crate::metrics::client_metrics::MetricsCache;
use crate::types::EdgeToken;

pub fn features_from_disk(path: &str) -> ClientFeatures {
Expand All @@ -68,14 +70,20 @@ mod tests {
test_server(move || {
let config = serde_qs::actix::QsQueryConfig::default()
.qs_config(serde_qs::Config::new(5, false));

let metrics_cache = MetricsCache::default();
let connect_via = ConnectVia {
app_name: "edge".into(),
instance_id: "testinstance".into(),
};
HttpService::new(map_config(
App::new()
.app_data(config)
.app_data(web::Data::from(token_validator.clone()))
.app_data(web::Data::from(upstream_features_cache.clone()))
.app_data(web::Data::from(upstream_engine_cache.clone()))
.app_data(web::Data::from(upstream_token_cache.clone()))
.app_data(web::Data::new(metrics_cache))
.app_data(web::Data::new(connect_via))
.service(
web::scope("/api")
.configure(crate::client_api::configure_client_api)
Expand Down
Loading

0 comments on commit 1edda8c

Please sign in to comment.