Skip to content

Commit

Permalink
fix+refactor: JSON flattening, custom partition check (#1055)
Browse files Browse the repository at this point in the history
- Refactor parts of JSON flattening code to improve readability 
and performance (lesser cloning).
- Fixes custom partition check by not allowing objects, arrays as well.
- Adds tests and documentation also
  • Loading branch information
de-sh authored Dec 27, 2024
1 parent d332358 commit 603b095
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 387 deletions.
9 changes: 4 additions & 5 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,10 @@ impl Alert {
);
let deployment_id = storage::StorageMetadata::global().deployment_id;
let deployment_mode = storage::StorageMetadata::global().mode.to_string();
let additional_labels =
let mut additional_labels =
serde_json::to_value(rule).expect("rule is perfectly deserializable");
let flatten_additional_labels =
utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_")
.expect("can be flattened");
utils::json::flatten::flatten_with_parent_prefix(&mut additional_labels, "rule", "_")
.expect("can be flattened");
Context::new(
stream_name,
AlertInfo::new(
Expand All @@ -122,7 +121,7 @@ impl Alert {
alert_state,
),
DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode),
flatten_additional_labels,
additional_labels,
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub async fn create_stream_if_not_exists(
super::logstream::create_stream(
stream_name.to_string(),
"",
"",
None,
"",
"",
Arc::new(Schema::empty()),
Expand Down
9 changes: 6 additions & 3 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use http::{HeaderName, HeaderValue};
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::Arc;
use tracing::{error, warn};
Expand Down Expand Up @@ -471,7 +472,7 @@ fn remove_id_from_alerts(value: &mut Value) {
pub async fn create_stream(
stream_name: String,
time_partition: &str,
time_partition_limit: &str,
time_partition_limit: Option<NonZeroU32>,
custom_partition: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
Expand Down Expand Up @@ -511,7 +512,7 @@ pub async fn create_stream(
stream_name.to_string(),
created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema,
Expand Down Expand Up @@ -561,7 +562,9 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
created_at: stream_meta.created_at.clone(),
first_event_at: stream_meta.first_event_at.clone(),
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta.time_partition_limit.clone(),
time_partition_limit: stream_meta
.time_partition_limit
.map(|limit| limit.to_string()),
custom_partition: stream_meta.custom_partition.clone(),
cache_enabled: stream_meta.cache_enabled,
static_schema_flag: stream_meta.static_schema_flag.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub async fn push_logs(
let data = convert_array_to_object(
&body_val,
time_partition.as_ref(),
time_partition_limit.as_ref(),
time_partition_limit,
None,
)?;
for value in data {
Expand All @@ -135,7 +135,7 @@ pub async fn push_logs(
let data = convert_array_to_object(
&body_val,
time_partition.as_ref(),
time_partition_limit.as_ref(),
time_partition_limit,
custom_partition.as_ref(),
)?;
let custom_partition = custom_partition.unwrap();
Expand Down
23 changes: 11 additions & 12 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ pub async fn create_update_stream(
}

let time_partition_in_days = if !time_partition_limit.is_empty() {
validate_time_partition_limit(&time_partition_limit)?
Some(validate_time_partition_limit(&time_partition_limit)?)
} else {
""
None
};

if !custom_partition.is_empty() {
Expand Down Expand Up @@ -207,20 +207,20 @@ pub fn fetch_headers_from_put_stream_request(

pub fn validate_time_partition_limit(
time_partition_limit: &str,
) -> Result<&str, CreateStreamError> {
) -> Result<NonZeroU32, CreateStreamError> {
if !time_partition_limit.ends_with('d') {
return Err(CreateStreamError::Custom {
msg: "Missing 'd' suffix for duration value".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
if days.parse::<NonZeroU32>().is_err() {
let Ok(days) = days.parse::<NonZeroU32>() else {
return Err(CreateStreamError::Custom {
msg: "Could not convert duration to an unsigned number".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
};

Ok(days)
}
Expand Down Expand Up @@ -288,7 +288,7 @@ pub fn validate_static_schema(

pub async fn update_time_partition_limit_in_stream(
stream_name: String,
time_partition_limit: &str,
time_partition_limit: NonZeroU32,
) -> Result<(), CreateStreamError> {
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
Expand All @@ -299,7 +299,7 @@ pub async fn update_time_partition_limit_in_stream(
}

if metadata::STREAM_INFO
.update_time_partition_limit(&stream_name, time_partition_limit.to_string())
.update_time_partition_limit(&stream_name, time_partition_limit)
.is_err()
{
return Err(CreateStreamError::Custom {
Expand Down Expand Up @@ -381,7 +381,7 @@ pub async fn update_custom_partition_in_stream(
pub async fn create_stream(
stream_name: String,
time_partition: &str,
time_partition_limit: &str,
time_partition_limit: Option<NonZeroU32>,
custom_partition: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
Expand Down Expand Up @@ -421,7 +421,7 @@ pub async fn create_stream(
stream_name.to_string(),
created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema,
Expand Down Expand Up @@ -470,8 +470,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
let time_partition = stream_metadata.time_partition.as_deref().unwrap_or("");
let time_partition_limit = stream_metadata
.time_partition_limit
.as_deref()
.unwrap_or("");
.and_then(|limit| limit.parse().ok());
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or("");
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");
Expand All @@ -480,7 +479,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
stream_name.to_string(),
stream_metadata.created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema,
Expand Down
25 changes: 13 additions & 12 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use itertools::Itertools;
use once_cell::sync::Lazy;
use serde_json::Value;
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::{Arc, RwLock};

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
Expand Down Expand Up @@ -53,7 +54,7 @@ pub struct LogStreamMetadata {
pub created_at: String,
pub first_event_at: Option<String>,
pub time_partition: Option<String>,
pub time_partition_limit: Option<String>,
pub time_partition_limit: Option<NonZeroU32>,
pub custom_partition: Option<String>,
pub static_schema_flag: Option<String>,
pub hot_tier_enabled: Option<bool>,
Expand Down Expand Up @@ -113,11 +114,11 @@ impl StreamInfo {
pub fn get_time_partition_limit(
&self,
stream_name: &str,
) -> Result<Option<String>, MetadataError> {
) -> Result<Option<NonZeroU32>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.time_partition_limit.clone())
.map(|metadata| metadata.time_partition_limit)
}

pub fn get_custom_partition(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
Expand Down Expand Up @@ -202,7 +203,7 @@ impl StreamInfo {
pub fn update_time_partition_limit(
&self,
stream_name: &str,
time_partition_limit: String,
time_partition_limit: NonZeroU32,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
Expand Down Expand Up @@ -244,7 +245,7 @@ impl StreamInfo {
stream_name: String,
created_at: String,
time_partition: String,
time_partition_limit: String,
time_partition_limit: Option<NonZeroU32>,
custom_partition: String,
static_schema_flag: String,
static_schema: HashMap<String, Arc<Field>>,
Expand All @@ -262,11 +263,7 @@ impl StreamInfo {
} else {
Some(time_partition)
},
time_partition_limit: if time_partition_limit.is_empty() {
None
} else {
Some(time_partition_limit)
},
time_partition_limit,
custom_partition: if custom_partition.is_empty() {
None
} else {
Expand Down Expand Up @@ -320,7 +317,9 @@ impl StreamInfo {
created_at: meta.created_at,
first_event_at: meta.first_event_at,
time_partition: meta.time_partition,
time_partition_limit: meta.time_partition_limit,
time_partition_limit: meta
.time_partition_limit
.and_then(|limit| limit.parse().ok()),
custom_partition: meta.custom_partition,
static_schema_flag: meta.static_schema_flag,
hot_tier_enabled: meta.hot_tier_enabled,
Expand Down Expand Up @@ -473,7 +472,9 @@ pub async fn load_stream_metadata_on_server_start(
created_at: meta.created_at,
first_event_at: meta.first_event_at,
time_partition: meta.time_partition,
time_partition_limit: meta.time_partition_limit,
time_partition_limit: meta
.time_partition_limit
.and_then(|limit| limit.parse().ok()),
custom_partition: meta.custom_partition,
static_schema_flag: meta.static_schema_flag,
hot_tier_enabled: meta.hot_tier_enabled,
Expand Down
17 changes: 5 additions & 12 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use relative_path::RelativePathBuf;
use tracing::error;

use std::collections::BTreeMap;
use std::num::NonZeroU32;
use std::{
collections::HashMap,
fs,
Expand Down Expand Up @@ -145,7 +146,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
&self,
stream_name: &str,
time_partition: &str,
time_partition_limit: &str,
time_partition_limit: Option<NonZeroU32>,
custom_partition: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
Expand All @@ -162,11 +163,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
} else {
format.time_partition = Some(time_partition.to_string());
}
if time_partition_limit.is_empty() {
format.time_partition_limit = None;
} else {
format.time_partition_limit = Some(time_partition_limit.to_string());
}
format.time_partition_limit = time_partition_limit.map(|limit| limit.to_string());
if custom_partition.is_empty() {
format.custom_partition = None;
} else {
Expand All @@ -190,14 +187,10 @@ pub trait ObjectStorage: Send + Sync + 'static {
async fn update_time_partition_limit_in_stream(
&self,
stream_name: &str,
time_partition_limit: &str,
time_partition_limit: NonZeroU32,
) -> Result<(), ObjectStorageError> {
let mut format = self.get_object_store_format(stream_name).await?;
if time_partition_limit.is_empty() {
format.time_partition_limit = None;
} else {
format.time_partition_limit = Some(time_partition_limit.to_string());
}
format.time_partition_limit = Some(time_partition_limit.to_string());
let format_json = to_bytes(&format);
self.put_object(&stream_json_path(stream_name), format_json)
.await?;
Expand Down
Loading

0 comments on commit 603b095

Please sign in to comment.