Skip to content

Commit

Permalink
refactor: use static MetaField schema for incr query
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jan 20, 2025
1 parent 4575ccd commit 1131a41
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ serde_json = { version = "1.0" }
thiserror = { version = "2.0.10" }
bytes = { version = "1" }
chrono = { version = "0.4" }
lazy_static = { version = "1.5.0" }
log = { version = "0.4" }
paste = { version = "1.0.15" }
strum = { version = "0.26", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ serde_json = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
paste = { workspace = true }
strum = { workspace = true }
Expand Down
29 changes: 29 additions & 0 deletions crates/core/src/metadata/meta_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
use crate::error::CoreError;
use crate::Result;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use lazy_static::lazy_static;
use std::fmt::Display;
use std::str::FromStr;
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MetaField {
Expand Down Expand Up @@ -66,11 +69,37 @@ impl FromStr for MetaField {
}
}

lazy_static! {
static ref SCHEMA: Arc<Schema> = Arc::new(Schema::new(vec![
Field::new(MetaField::CommitTime.as_ref(), DataType::Utf8, false),
Field::new(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
Field::new(MetaField::RecordKey.as_ref(), DataType::Utf8, false),
Field::new(MetaField::PartitionPath.as_ref(), DataType::Utf8, false),
Field::new(MetaField::FileName.as_ref(), DataType::Utf8, false),
]));
static ref SCHEMA_WITH_OPERATION: Arc<Schema> = Arc::new(Schema::new(vec![
Field::new(MetaField::CommitTime.as_ref(), DataType::Utf8, false),
Field::new(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
Field::new(MetaField::RecordKey.as_ref(), DataType::Utf8, false),
Field::new(MetaField::PartitionPath.as_ref(), DataType::Utf8, false),
Field::new(MetaField::FileName.as_ref(), DataType::Utf8, false),
Field::new(MetaField::Operation.as_ref(), DataType::Utf8, false),
]));
}

impl MetaField {
#[inline]
pub fn field_index(&self) -> usize {
self.clone() as usize
}

pub fn schema() -> SchemaRef {
SCHEMA.clone()
}

pub fn schema_with_operation() -> SchemaRef {
SCHEMA_WITH_OPERATION.clone()
}
}

#[cfg(test)]
Expand Down
15 changes: 9 additions & 6 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ use crate::table::partition::PartitionPruner;
use crate::timeline::Timeline;
use crate::Result;

use crate::metadata::meta_field::MetaField;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -263,16 +264,16 @@ impl Table {
)
}

pub async fn create_file_group_reader_with_filters(
pub fn create_file_group_reader_with_filters(
&self,
filters: &[Filter],
schema: &Schema,
) -> Result<FileGroupReader> {
let schema = self.get_schema().await?;
FileGroupReader::new_with_filters(
self.file_system_view.storage.clone(),
self.hudi_configs.clone(),
filters,
&schema,
schema,
)
}

Expand Down Expand Up @@ -335,10 +336,12 @@ impl Table {

// Read incremental records from the file slices.
let filters = &[
FilterField::new("_hoodie_commit_time").gt(start_timestamp),
FilterField::new("_hoodie_commit_time").lte(as_of_timestamp),
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
FilterField::new(MetaField::CommitTime.as_ref()).lte(as_of_timestamp),
];
let fg_reader = self.create_file_group_reader_with_filters(filters).await?;
let fg_reader = self
.create_file_group_reader_with_filters(filters, MetaField::schema().as_ref())
.await?;
let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite;
let batches = futures::future::try_join_all(
file_slices
Expand Down

0 comments on commit 1131a41

Please sign in to comment.