diff --git a/Cargo.lock b/Cargo.lock index 5003fac387013..4c18b2312cff7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2052,7 +2052,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "log", @@ -6228,8 +6228,8 @@ dependencies = [ [[package]] name = "iceberg" -version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21#2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" +version = "0.4.0" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=683fb89edeaf8d1baae69e1f376d68b92be1d496#683fb89edeaf8d1baae69e1f376d68b92be1d496" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6256,7 +6256,7 @@ dependencies = [ "murmur3", "num-bigint", "once_cell", - "opendal 0.50.1", + "opendal 0.51.0", "ordered-float 4.1.1", "parquet 53.2.0", "paste", @@ -6273,12 +6273,13 @@ dependencies = [ "typed-builder 0.20.0", "url", "uuid", + "zstd 0.13.2", ] [[package]] name = "iceberg-catalog-glue" -version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21#2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" +version = "0.4.0" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=683fb89edeaf8d1baae69e1f376d68b92be1d496#683fb89edeaf8d1baae69e1f376d68b92be1d496" dependencies = [ "anyhow", "async-trait", @@ -6294,8 +6295,8 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" -version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21#2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" +version = "0.4.0" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=683fb89edeaf8d1baae69e1f376d68b92be1d496#683fb89edeaf8d1baae69e1f376d68b92be1d496" dependencies = [ "async-trait", "chrono", @@ -8281,9 +8282,9 @@ dependencies = [ [[package]] name = "opendal" -version = "0.50.1" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213222b6c86949314d8f51acb26d8241e7c8dd0879b016a79471d49f21ee592f" +checksum = "6c8cd8697b917793c15a7b4a8afcba44e35e2abbc55c363064851776f7c81136" dependencies = [ "anyhow", "async-trait", @@ -8292,7 +8293,6 @@ dependencies = [ "bytes", "chrono", "crc32c", - "flagset", "futures", "getrandom", "http 1.2.0", @@ -9615,7 +9615,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.87", @@ -9807,7 +9807,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.1", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -10199,9 +10199,9 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa" +checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index b0659c775455f..6860d9dc87126 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,10 +152,10 @@ prost-build = { version = "0.13" } icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44fa826c91139c9cf459b005741df990ae9da", features = [ "prometheus", ] } -# branch dev-rebase-main-20241030 -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" } +# branch dev_rebase_main_20241230 +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" } opendal = "0.49" # used only by arrow-udf-flight arrow-flight = "53" diff --git a/src/batch/executors/src/executor/iceberg_scan.rs b/src/batch/executors/src/executor/iceberg_scan.rs index 027426925ad34..2e5ee3b60d38b 100644 --- a/src/batch/executors/src/executor/iceberg_scan.rs +++ b/src/batch/executors/src/executor/iceberg_scan.rs @@ -190,7 +190,8 @@ impl IcebergScanExecutor { .build(); let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task)); - let mut record_batch_stream = reader.read(Box::pin(file_scan_stream))?.enumerate(); + let mut record_batch_stream = + reader.read(Box::pin(file_scan_stream)).await?.enumerate(); while let Some((index, record_batch)) = record_batch_stream.next().await { let record_batch = record_batch?; @@ -321,7 +322,9 @@ impl PositionDeleteFilter { let reader = table.reader_builder().with_batch_size(batch_size).build(); - let mut record_batch_stream = reader.read(Box::pin(position_delete_file_scan_stream))?; + let mut record_batch_stream = reader + .read(Box::pin(position_delete_file_scan_stream)) + .await?; while let Some(record_batch) = record_batch_stream.next().await { let record_batch = record_batch?; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index a599dddc74982..7dad6b79850cc 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -21,13 +21,17 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; use async_trait::async_trait; -use iceberg::arrow::schema_to_arrow_schema; +use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; use iceberg::spec::{DataFile, SerializedDataFile}; use iceberg::table::Table; use iceberg::transaction::Transaction; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; -use iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder; -use iceberg::writer::base_writer::sort_position_delete_writer::SortPositionDeleteWriterBuilder; +use iceberg::writer::base_writer::equality_delete_writer::{ + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, +}; +use iceberg::writer::base_writer::sort_position_delete_writer::{ + SortPositionDeleteWriterBuilder, POSITION_DELETE_SCHEMA, +}; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; @@ -540,6 +544,7 @@ impl IcebergSinkWriter { let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::new(), + schema.clone(), table.file_io().clone(), DefaultLocationGenerator::new(table.metadata().clone()) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, @@ -549,8 +554,7 @@ impl IcebergSinkWriter { iceberg::spec::DataFileFormat::Parquet, ), ); - let data_file_builder = - DataFileWriterBuilder::new(schema.clone(), parquet_writer_builder, None); + let data_file_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); if let Some(_extra_partition_col_idx) = extra_partition_col_idx { Err(SinkError::Iceberg(anyhow!( "Extra partition column is not supported in append-only mode" @@ -586,8 +590,12 @@ impl IcebergSinkWriter { }) } else { let partition_builder = MonitoredGeneralWriterBuilder::new( - FanoutPartitionWriterBuilder::new(data_file_builder, partition_spec.clone()) - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, + FanoutPartitionWriterBuilder::new( + data_file_builder, + partition_spec.clone(), + schema.clone(), + ) + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, write_qps.clone(), write_latency.clone(), ); @@ -662,6 +670,7 @@ impl IcebergSinkWriter { let data_file_builder = { let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::new(), + schema.clone(), table.file_io().clone(), DefaultLocationGenerator::new(table.metadata().clone()) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, @@ -671,11 +680,12 @@ impl IcebergSinkWriter { iceberg::spec::DataFileFormat::Parquet, ), ); - DataFileWriterBuilder::new(schema.clone(), parquet_writer_builder.clone(), None) + DataFileWriterBuilder::new(parquet_writer_builder.clone(), None) }; let position_delete_builder = { let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::new(), + POSITION_DELETE_SCHEMA.clone(), table.file_io().clone(), DefaultLocationGenerator::new(table.metadata().clone()) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, @@ -691,8 +701,18 @@ impl IcebergSinkWriter { ) }; let equality_delete_builder = { + let config = EqualityDeleteWriterConfig::new( + unique_column_ids.clone(), + table.metadata().current_schema().clone(), + None, + ) + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::new(), + Arc::new( + arrow_schema_to_schema(config.projected_arrow_schema_ref()) + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, + ), table.file_io().clone(), DefaultLocationGenerator::new(table.metadata().clone()) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, @@ -703,13 +723,7 @@ impl IcebergSinkWriter { ), ); - EqualityDeleteFileWriterBuilder::new( - parquet_writer_builder.clone(), - unique_column_ids.clone(), - table.metadata().current_schema().clone(), - None, - ) - .map_err(|err| SinkError::Iceberg(anyhow!(err)))? + EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config) }; let delta_builder = EqualityDeltaWriterBuilder::new( data_file_builder, @@ -762,19 +776,6 @@ impl IcebergSinkWriter { }, }) } else { - let partition_builder = MonitoredGeneralWriterBuilder::new( - FanoutPartitionWriterBuilder::new(delta_builder, partition_spec.clone()) - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, - write_qps.clone(), - write_latency.clone(), - ); - let inner_writer = Some(Box::new( - partition_builder - .clone() - .build() - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, - ) as Box); let original_arrow_schema = Arc::new( schema_to_arrow_schema(table.metadata().current_schema()) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, @@ -788,6 +789,23 @@ impl IcebergSinkWriter { ))); Arc::new(ArrowSchema::new(new_fields)) }; + let partition_builder = MonitoredGeneralWriterBuilder::new( + FanoutPartitionWriterBuilder::new_with_custom_schema( + delta_builder, + schema_with_extra_op_column.clone(), + partition_spec.clone(), + table.metadata().current_schema().clone(), + ), + write_qps.clone(), + write_latency.clone(), + ); + let inner_writer = Some(Box::new( + partition_builder + .clone() + .build() + .await + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, + ) as Box); Ok(Self { arrow_schema: original_arrow_schema, metrics: IcebergWriterMetrics { @@ -1020,11 +1038,7 @@ impl SinkWriter for IcebergSinkWriter { match close_result { Some(Ok(result)) => { let version = self.table.metadata().format_version() as u8; - let partition_type = self - .table - .metadata() - .default_partition_spec() - .partition_type(); + let partition_type = self.table.metadata().default_partition_type(); let data_files = result .into_iter() .map(|f| { @@ -1216,17 +1230,17 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { write_results[0].partition_spec_id ))); }; - let bound_partition_spec = partition_spec + let partition_type = partition_spec .as_ref() .clone() - .bind(schema.clone()) + .partition_type(schema) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; let data_files = write_results .into_iter() .flat_map(|r| { r.data_files.into_iter().map(|f| { - f.try_into(bound_partition_spec.partition_type(), schema) + f.try_into(&partition_type, schema) .map_err(|err| SinkError::Iceberg(anyhow!(err))) }) })