Skip to content

Commit

Permalink
refactor: improve file listing flow
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jan 20, 2025
1 parent 3dc5fe2 commit 8b767d6
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 228 deletions.
23 changes: 7 additions & 16 deletions crates/core/src/file_group/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ pub fn build_file_groups(commit_metadata: &Map<String, Value>) -> Result<HashSet
.as_array()
.ok_or_else(|| CoreError::CommitMetadata("Invalid write stats array".into()))?;

let partition = (!partition.is_empty()).then(|| partition.to_string());

for stat in write_stats {
let file_id = stat
.get("fileId")
Expand Down Expand Up @@ -85,8 +83,6 @@ pub fn build_replaced_file_groups(
.as_array()
.ok_or_else(|| CoreError::CommitMetadata("Invalid file group ids array".into()))?;

let partition = (!partition.is_empty()).then(|| partition.to_string());

for file_id in file_ids {
let id = file_id
.as_str()
Expand Down Expand Up @@ -269,17 +265,15 @@ mod tests {
"byteField=20/shortField=100",
"byteField=10/shortField=300",
]);
let actual_partitions = HashSet::<&str>::from_iter(
file_groups
.iter()
.map(|fg| fg.partition_path.as_ref().unwrap().as_str()),
);
let actual_partitions =
HashSet::<&str>::from_iter(file_groups.iter().map(|fg| fg.partition_path.as_str()));
assert_eq!(actual_partitions, expected_partitions);
}
}

mod test_build_replaced_file_groups {
use super::super::*;
use crate::table::partition::EMPTY_PARTITION_PATH;
use serde_json::{json, Map, Value};

#[test]
Expand Down Expand Up @@ -369,7 +363,7 @@ mod tests {
let file_groups = result.unwrap();
assert_eq!(file_groups.len(), 1);
let file_group = file_groups.iter().next().unwrap();
assert!(file_group.partition_path.is_none());
assert_eq!(file_group.partition_path, EMPTY_PARTITION_PATH);
}

#[test]
Expand All @@ -391,7 +385,7 @@ mod tests {
let file_groups = result.unwrap();
let actual_partition_paths = file_groups
.iter()
.map(|fg| fg.partition_path.as_ref().unwrap().as_str())
.map(|fg| fg.partition_path.as_str())
.collect::<Vec<_>>();
assert_eq!(actual_partition_paths, &["20", "20"]);
}
Expand Down Expand Up @@ -432,11 +426,8 @@ mod tests {
assert_eq!(file_groups.len(), 3);

let expected_partitions = HashSet::from_iter(vec!["10", "20", "30"]);
let actual_partitions = HashSet::<&str>::from_iter(
file_groups
.iter()
.map(|fg| fg.partition_path.as_ref().unwrap().as_str()),
);
let actual_partitions =
HashSet::<&str>::from_iter(file_groups.iter().map(|fg| fg.partition_path.as_str()));
assert_eq!(actual_partitions, expected_partitions);
}
}
Expand Down
12 changes: 3 additions & 9 deletions crates/core/src/file_group/file_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use std::path::PathBuf;
pub struct FileSlice {
pub base_file: BaseFile,
pub log_files: BTreeSet<LogFile>,
pub partition_path: Option<String>,
pub partition_path: String,
}

impl FileSlice {
pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
pub fn new(base_file: BaseFile, partition_path: String) -> Self {
Self {
base_file,
log_files: BTreeSet::new(),
Expand All @@ -43,7 +43,7 @@ impl FileSlice {
}

fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
let path = PathBuf::from(self.partition_path()).join(file_name);
let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
path.to_str().map(|s| s.to_string()).ok_or_else(|| {
CoreError::FileGroup(format!("Failed to get relative path for file: {file_name}",))
})
Expand All @@ -67,12 +67,6 @@ impl FileSlice {
&self.base_file.file_id
}

/// Returns the partition path of the [FileSlice].
#[inline]
pub fn partition_path(&self) -> &str {
self.partition_path.as_deref().unwrap_or_default()
}

/// Returns the instant time that marks the [FileSlice] creation.
///
/// This is also an instant time stored in the [Timeline].
Expand Down
29 changes: 18 additions & 11 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::str::FromStr;
#[derive(Clone, Debug)]
pub struct FileGroup {
pub file_id: String,
pub partition_path: Option<String>,
pub partition_path: String,
pub file_slices: BTreeMap<String, FileSlice>,
}

Expand All @@ -64,7 +64,7 @@ impl fmt::Display for FileGroup {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str(
format!(
"File Group: partition {:?} id {}",
"File Group: partition={}, id={}",
&self.partition_path, &self.file_id
)
.as_str(),
Expand All @@ -73,7 +73,7 @@ impl fmt::Display for FileGroup {
}

impl FileGroup {
pub fn new(file_id: String, partition_path: Option<String>) -> Self {
pub fn new(file_id: String, partition_path: String) -> Self {
Self {
file_id,
partition_path,
Expand All @@ -83,7 +83,7 @@ impl FileGroup {

pub fn new_with_base_file_name(
id: String,
partition_path: Option<String>,
partition_path: String,
file_name: &str,
) -> Result<Self> {
let mut file_group = Self::new(id, partition_path);
Expand Down Expand Up @@ -175,18 +175,22 @@ impl FileGroup {
#[cfg(test)]
mod tests {
use super::*;
use crate::table::partition::EMPTY_PARTITION_PATH;

#[test]
fn load_a_valid_file_group() {
let mut fg = FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
let mut fg = FileGroup::new(
"5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(),
EMPTY_PARTITION_PATH.to_string(),
);
let _ = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
);
let _ = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
);
assert_eq!(fg.file_slices.len(), 2);
assert!(fg.partition_path.is_none());
assert_eq!(fg.partition_path, EMPTY_PARTITION_PATH);
let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| k.as_str()).collect();
assert_eq!(commit_times, vec!["20240402123035233", "20240402144910683"]);
assert_eq!(
Expand All @@ -201,7 +205,10 @@ mod tests {

#[test]
fn add_base_file_with_same_commit_time_should_fail() {
let mut fg = FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
let mut fg = FileGroup::new(
"5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(),
EMPTY_PARTITION_PATH.to_string(),
);
let res1 = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
);
Expand All @@ -217,28 +224,28 @@ mod tests {
fn test_file_group_display() {
let file_group = FileGroup {
file_id: "group123".to_string(),
partition_path: Some("part/2023-01-01".to_string()),
partition_path: "part/2023-01-01".to_string(),
file_slices: BTreeMap::new(),
};

let display_string = format!("{}", file_group);

assert_eq!(
display_string,
"File Group: partition Some(\"part/2023-01-01\") id group123"
"File Group: partition=part/2023-01-01, id=group123"
);

let file_group_no_partition = FileGroup {
file_id: "group456".to_string(),
partition_path: None,
partition_path: EMPTY_PARTITION_PATH.to_string(),
file_slices: BTreeMap::new(),
};

let display_string_no_partition = format!("{}", file_group_no_partition);

assert_eq!(
display_string_no_partition,
"File Group: partition None id group456"
"File Group: partition=, id=group456"
);
}
}
10 changes: 10 additions & 0 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@
* under the License.
*/
pub mod meta_field;

pub const HUDI_METADATA_DIR: &str = ".hoodie";
pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
pub const ICEBERG_METADATA_DIR: &str = "metadata";

pub const LAKE_FORMAT_METADATA_DIRS: &[&str; 3] = &[
HUDI_METADATA_DIR,
DELTALAKE_METADATA_DIR,
ICEBERG_METADATA_DIR,
];
Loading

0 comments on commit 8b767d6

Please sign in to comment.