diff --git a/Cargo.toml b/Cargo.toml index 976efea..ab764ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ serde_json = { version = "1.0" } thiserror = { version = "2.0.10" } bytes = { version = "1" } chrono = { version = "0.4" } +lazy_static = { version = "1.5.0" } log = { version = "0.4" } paste = { version = "1.0.15" } strum = { version = "0.26", features = ["derive"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index fe8c79d..b835b43 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -51,6 +51,7 @@ serde_json = { workspace = true } thiserror = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } +lazy_static = { workspace = true } log = { workspace = true } paste = { workspace = true } strum = { workspace = true } diff --git a/crates/core/src/metadata/meta_field.rs b/crates/core/src/metadata/meta_field.rs index 240aeff..9b80088 100644 --- a/crates/core/src/metadata/meta_field.rs +++ b/crates/core/src/metadata/meta_field.rs @@ -18,8 +18,11 @@ */ use crate::error::CoreError; use crate::Result; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use lazy_static::lazy_static; use std::fmt::Display; use std::str::FromStr; +use std::sync::Arc; #[derive(Debug, Clone, PartialEq, Eq)] pub enum MetaField { @@ -66,11 +69,37 @@ impl FromStr for MetaField { } } +lazy_static! { + static ref SCHEMA: Arc = Arc::new(Schema::new(vec![ + Field::new(MetaField::CommitTime.as_ref(), DataType::Utf8, false), + Field::new(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false), + Field::new(MetaField::RecordKey.as_ref(), DataType::Utf8, false), + Field::new(MetaField::PartitionPath.as_ref(), DataType::Utf8, false), + Field::new(MetaField::FileName.as_ref(), DataType::Utf8, false), + ])); + static ref SCHEMA_WITH_OPERATION: Arc = Arc::new(Schema::new(vec![ + Field::new(MetaField::CommitTime.as_ref(), DataType::Utf8, false), + Field::new(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false), + Field::new(MetaField::RecordKey.as_ref(), DataType::Utf8, false), + Field::new(MetaField::PartitionPath.as_ref(), DataType::Utf8, false), + Field::new(MetaField::FileName.as_ref(), DataType::Utf8, false), + Field::new(MetaField::Operation.as_ref(), DataType::Utf8, false), + ])); +} + impl MetaField { #[inline] pub fn field_index(&self) -> usize { self.clone() as usize } + + pub fn schema() -> SchemaRef { + SCHEMA.clone() + } + + pub fn schema_with_operation() -> SchemaRef { + SCHEMA_WITH_OPERATION.clone() + } } #[cfg(test)] diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index c1db54d..e11051c 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -104,6 +104,7 @@ use crate::table::partition::PartitionPruner; use crate::timeline::Timeline; use crate::Result; +use crate::metadata::meta_field::MetaField; use arrow::record_batch::RecordBatch; use arrow_schema::{Field, Schema}; use std::collections::{HashMap, HashSet}; @@ -263,16 +264,16 @@ impl Table { ) } - pub async fn create_file_group_reader_with_filters( + pub fn create_file_group_reader_with_filters( &self, filters: &[Filter], + schema: &Schema, ) -> Result { - let schema = self.get_schema().await?; FileGroupReader::new_with_filters( self.file_system_view.storage.clone(), self.hudi_configs.clone(), filters, - &schema, + schema, ) } @@ -335,10 +336,12 @@ impl Table { // Read incremental records from the file slices. let filters = &[ - FilterField::new("_hoodie_commit_time").gt(start_timestamp), - FilterField::new("_hoodie_commit_time").lte(as_of_timestamp), + FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp), + FilterField::new(MetaField::CommitTime.as_ref()).lte(as_of_timestamp), ]; - let fg_reader = self.create_file_group_reader_with_filters(filters).await?; + let fg_reader = self + .create_file_group_reader_with_filters(filters, MetaField::schema().as_ref()) + .await?; let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite; let batches = futures::future::try_join_all( file_slices