diff --git a/Cargo.lock b/Cargo.lock index 756d000e14724..f751e3c312036 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7160,6 +7160,7 @@ dependencies = [ "mz-persist-types", "mz-pgcopy", "mz-pgrepr", + "mz-proto", "mz-repr", "mz-storage-types", "mz-timely-util", @@ -7167,6 +7168,7 @@ dependencies = [ "parquet", "prometheus", "proptest", + "prost", "reqwest 0.11.24", "sentry", "serde", @@ -8037,6 +8039,7 @@ dependencies = [ "bytes", "chrono", "flate2", + "futures", "half 2.4.1", "hashbrown 0.15.2", "lz4_flex", @@ -8046,6 +8049,7 @@ dependencies = [ "seq-macro", "snap", "thrift", + "tokio", "twox-hash", "zstd", "zstd-sys", diff --git a/src/adapter/src/coord/sequencer/inner/copy_from.rs b/src/adapter/src/coord/sequencer/inner/copy_from.rs index 112c1251b516a..3bead8abb07f7 100644 --- a/src/adapter/src/coord/sequencer/inner/copy_from.rs +++ b/src/adapter/src/coord/sequencer/inner/copy_from.rs @@ -80,6 +80,7 @@ impl Coordinator { CopyFormatParams::Csv(csv) => { mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned()) } + CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet, CopyFormatParams::Text(_) | CopyFormatParams::Binary => { mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning"); ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL format"))); diff --git a/src/pgcopy/src/copy.proto b/src/pgcopy/src/copy.proto index b83c5242d7f13..799a31a627a48 100644 --- a/src/pgcopy/src/copy.proto +++ b/src/pgcopy/src/copy.proto @@ -18,6 +18,7 @@ message ProtoCopyFormatParams { ProtoCopyTextFormatParams text = 1; ProtoCopyCsvFormatParams csv = 2; google.protobuf.Empty binary = 3; + ProtoCopyParquetFormatParams parquet = 4; } } @@ -33,3 +34,5 @@ message ProtoCopyCsvFormatParams { bool header = 4; string null = 5; } + +message ProtoCopyParquetFormatParams {} diff --git a/src/pgcopy/src/copy.rs b/src/pgcopy/src/copy.rs index c057f02a24fa5..03babcf9d432b 100644 --- a/src/pgcopy/src/copy.rs +++ b/src/pgcopy/src/copy.rs @@ -436,6 +436,7 @@ pub enum CopyFormatParams<'a> { Text(CopyTextFormatParams<'a>), Csv(CopyCsvFormatParams<'a>), Binary, + Parquet, } impl RustType for CopyFormatParams<'static> { @@ -446,6 +447,7 @@ impl RustType for CopyFormatParams<'static> { Self::Text(f) => Kind::Text(f.into_proto()), Self::Csv(f) => Kind::Csv(f.into_proto()), Self::Binary => Kind::Binary(()), + Self::Parquet => Kind::Parquet(ProtoCopyParquetFormatParams::default()), }), } } @@ -456,6 +458,7 @@ impl RustType for CopyFormatParams<'static> { Some(Kind::Text(f)) => Ok(Self::Text(f.into_rust()?)), Some(Kind::Csv(f)) => Ok(Self::Csv(f.into_rust()?)), Some(Kind::Binary(())) => Ok(Self::Binary), + Some(Kind::Parquet(ProtoCopyParquetFormatParams {})) => Ok(Self::Parquet), None => Err(TryFromProtoError::missing_field( "ProtoCopyFormatParams::kind", )), @@ -482,6 +485,7 @@ impl CopyFormatParams<'static> { &CopyFormatParams::Text(_) => "txt", &CopyFormatParams::Csv(_) => "csv", &CopyFormatParams::Binary => "bin", + &CopyFormatParams::Parquet => "parquet", } } @@ -490,6 +494,7 @@ impl CopyFormatParams<'static> { CopyFormatParams::Text(_) => false, CopyFormatParams::Csv(params) => params.header, CopyFormatParams::Binary => false, + CopyFormatParams::Parquet => false, } } } @@ -507,6 +512,10 @@ pub fn decode_copy_format<'a>( io::ErrorKind::Unsupported, "cannot decode as binary format", )), + CopyFormatParams::Parquet => { + // TODO(cf2): Support Parquet over STDIN. + return Err(io::Error::new(io::ErrorKind::Unsupported, "parquet format")); + } } } @@ -521,6 +530,10 @@ pub fn encode_copy_format<'a>( CopyFormatParams::Text(params) => encode_copy_row_text(params, row, typ, out), CopyFormatParams::Csv(params) => encode_copy_row_csv(params, row, typ, out), CopyFormatParams::Binary => encode_copy_row_binary(row, typ, out), + CopyFormatParams::Parquet => { + // TODO(cf2): Support Parquet over STDIN. + return Err(io::Error::new(io::ErrorKind::Unsupported, "parquet format")); + } } } @@ -546,6 +559,10 @@ pub fn encode_copy_format_header<'a>( ]); encode_copy_row_csv(params, &header_row, &typ, out) } + CopyFormatParams::Parquet => { + // TODO(cf2): Support Parquet over STDIN. + return Err(io::Error::new(io::ErrorKind::Unsupported, "parquet format")); + } } } diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index 41034fdeef264..20831ad577d16 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -1194,7 +1194,7 @@ fn plan_copy_from( ) } CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"), - CopyFormat::Parquet => bail_unsupported!("FORMAT PARQUET"), + CopyFormat::Parquet => CopyFormatParams::Parquet, }; let filter = match (options.files, options.pattern) { diff --git a/src/storage-operators/BUILD.bazel b/src/storage-operators/BUILD.bazel index 964b6aea7bfaa..7e51893eaccb1 100644 --- a/src/storage-operators/BUILD.bazel +++ b/src/storage-operators/BUILD.bazel @@ -38,6 +38,7 @@ rust_library( "//src/persist-types:mz_persist_types", "//src/pgcopy:mz_pgcopy", "//src/pgrepr:mz_pgrepr", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/storage-types:mz_storage_types", "//src/timely-util:mz_timely_util", @@ -81,6 +82,7 @@ rust_test( "//src/persist-types:mz_persist_types", "//src/pgcopy:mz_pgcopy", "//src/pgrepr:mz_pgrepr", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/storage-types:mz_storage_types", "//src/timely-util:mz_timely_util", @@ -104,6 +106,7 @@ rust_doc_test( "//src/persist-types:mz_persist_types", "//src/pgcopy:mz_pgcopy", "//src/pgrepr:mz_pgrepr", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/storage-types:mz_storage_types", "//src/timely-util:mz_timely_util", diff --git a/src/storage-operators/Cargo.toml b/src/storage-operators/Cargo.toml index 9f9705788510c..8749ea8d8dafb 100644 --- a/src/storage-operators/Cargo.toml +++ b/src/storage-operators/Cargo.toml @@ -34,13 +34,15 @@ mz-persist-client = { path = "../persist-client" } mz-persist-types = { path = "../persist-types" } mz-pgcopy = { path = "../pgcopy" } mz-pgrepr = { path = "../pgrepr" } +mz-proto = { path = "../proto" } mz-repr = { path = "../repr" } mz-storage-types = { path = "../storage-types" } mz-timely-util = { path = "../timely-util" } mz-txn-wal = { path = "../txn-wal" } -parquet = { version = "53.3.0", default-features = false, features = ["arrow", "snap"] } +parquet = { version = "53.3.0", default-features = false, features = ["arrow", "async", "snap"] } prometheus = { version = "0.13.3", default-features = false } proptest = { version = "1.6.0", default-features = false, features = ["std"] } +prost = "0.13.2" reqwest = { version = "0.11.13", features = ["stream"] } sentry = { version = "0.29.1" } serde = { version = "1.0.152", features = ["derive"] } diff --git a/src/storage-operators/src/oneshot_source.rs b/src/storage-operators/src/oneshot_source.rs index 7755d087b3c2e..7dd5efd26d199 100644 --- a/src/storage-operators/src/oneshot_source.rs +++ b/src/storage-operators/src/oneshot_source.rs @@ -99,8 +99,10 @@ use tracing::info; use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object}; use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest}; use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource}; +use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest}; pub mod csv; +pub mod parquet; pub mod aws_source; pub mod http_source; @@ -162,6 +164,10 @@ where let format = CsvDecoder::new(params, &collection_meta.relation_desc); FormatKind::Csv(format) } + ContentFormat::Parquet => { + let format = ParquetFormat::new(collection_meta.relation_desc.clone()); + FormatKind::Parquet(format) + } }; // Discover what objects are available to copy. @@ -627,6 +633,9 @@ pub trait OneshotObject { /// Name of the object, including any extensions. fn name(&self) -> &str; + /// Size of this object in bytes. + fn size(&self) -> usize; + /// Encodings of the _entire_ object, if any. /// /// Note: The object may internally use compression, e.g. a Parquet file @@ -645,11 +654,18 @@ pub enum Encoding { } /// Defines a remote system that we can fetch data from for a "one time" ingestion. -pub trait OneshotSource: Clone + Send { +pub trait OneshotSource: Clone + Send + Unpin { /// An individual unit within the source, e.g. a file. - type Object: OneshotObject + Debug + Clone + Send + Serialize + DeserializeOwned + 'static; + type Object: OneshotObject + + Debug + + Clone + + Send + + Unpin + + Serialize + + DeserializeOwned + + 'static; /// Checksum for a [`Self::Object`]. - type Checksum: Debug + Clone + Send + Serialize + DeserializeOwned + 'static; + type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static; /// Returns all of the objects for this source. fn list<'a>( @@ -744,6 +760,13 @@ impl OneshotObject for ObjectKind { } } + fn size(&self) -> usize { + match self { + ObjectKind::Http(object) => object.size(), + ObjectKind::AwsS3(object) => object.size(), + } + } + fn encodings(&self) -> &[Encoding] { match self { ObjectKind::Http(object) => object.encodings(), @@ -781,7 +804,7 @@ pub trait OneshotFormat: Clone { /// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that /// can later be decoded. - fn fetch_work<'a, S: OneshotSource + Sync>( + fn fetch_work<'a, S: OneshotSource + Sync + 'static>( &'a self, source: &'a S, request: Self::WorkRequest, @@ -801,9 +824,10 @@ pub trait OneshotFormat: Clone { /// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper /// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format /// types. -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) enum FormatKind { Csv(CsvDecoder), + Parquet(ParquetFormat), } impl OneshotFormat for FormatKind { @@ -830,10 +854,20 @@ impl OneshotFormat for FormatKind { .collect(); Ok(work) } + FormatKind::Parquet(parquet) => { + let work = parquet + .split_work(source, object, checksum) + .await + .context("parquet")? + .into_iter() + .map(RequestKind::Parquet) + .collect(); + Ok(work) + } } } - fn fetch_work<'a, S: OneshotSource + Sync>( + fn fetch_work<'a, S: OneshotSource + Sync + 'static>( &'a self, source: &'a S, request: Self::WorkRequest, @@ -844,6 +878,15 @@ impl OneshotFormat for FormatKind { .map_ok(RecordChunkKind::Csv) .map(|result| result.context("csv")) .boxed(), + (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet + .fetch_work(source, request) + .map_ok(RecordChunkKind::Parquet) + .map(|result| result.context("parquet")) + .boxed(), + (FormatKind::Parquet(_), RequestKind::Csv(_)) + | (FormatKind::Csv(_), RequestKind::Parquet(_)) => { + unreachable!("programming error, {self:?}") + } } } @@ -856,6 +899,13 @@ impl OneshotFormat for FormatKind { (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => { csv.decode_chunk(chunk, rows).context("csv") } + (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => { + parquet.decode_chunk(chunk, rows).context("parquet") + } + (FormatKind::Parquet(_), RecordChunkKind::Csv(_)) + | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => { + unreachable!("programming error, {self:?}") + } } } } @@ -863,11 +913,13 @@ impl OneshotFormat for FormatKind { #[derive(Clone, Debug, Serialize, Deserialize)] pub(crate) enum RequestKind { Csv(CsvWorkRequest), + Parquet(ParquetWorkRequest), } #[derive(Clone, Debug, Serialize, Deserialize)] pub(crate) enum RecordChunkKind { Csv(CsvRecord), + Parquet(ParquetRowGroup), } pub(crate) enum ObjectFilter { @@ -926,6 +978,8 @@ impl fmt::Display for StorageErrorX { pub enum StorageErrorXKind { #[error("csv decoding error: {0}")] CsvDecoding(Arc), + #[error("parquet error: {0}")] + ParquetError(Arc), #[error("reqwest error: {0}")] Reqwest(Arc), #[error("aws s3 request error: {0}")] @@ -970,6 +1024,12 @@ impl From for StorageErrorXKind { } } +impl From<::parquet::errors::ParquetError> for StorageErrorXKind { + fn from(err: ::parquet::errors::ParquetError) -> Self { + StorageErrorXKind::ParquetError(err.to_string().into()) + } +} + impl StorageErrorXKind { pub fn with_context(self, context: C) -> StorageErrorX { StorageErrorX { diff --git a/src/storage-operators/src/oneshot_source/aws_source.rs b/src/storage-operators/src/oneshot_source/aws_source.rs index 75349fd2e71fe..6478376304efe 100644 --- a/src/storage-operators/src/oneshot_source/aws_source.rs +++ b/src/storage-operators/src/oneshot_source/aws_source.rs @@ -119,6 +119,10 @@ impl OneshotObject for S3Object { &self.name } + fn size(&self) -> usize { + self.size + } + fn encodings(&self) -> &[super::Encoding] { &[] } diff --git a/src/storage-operators/src/oneshot_source/csv.rs b/src/storage-operators/src/oneshot_source/csv.rs index a935a19f0d803..2806b01715a1b 100644 --- a/src/storage-operators/src/oneshot_source/csv.rs +++ b/src/storage-operators/src/oneshot_source/csv.rs @@ -26,7 +26,7 @@ use crate::oneshot_source::{ Encoding, OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXKind, }; -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct CsvDecoder { /// Properties of the CSV Reader. params: CopyCsvFormatParams<'static>, @@ -106,7 +106,7 @@ impl OneshotFormat for CsvDecoder { Ok(vec![request]) } - fn fetch_work<'a, S: OneshotSource + Sync>( + fn fetch_work<'a, S: OneshotSource + Sync + 'static>( &'a self, source: &'a S, request: Self::WorkRequest, diff --git a/src/storage-operators/src/oneshot_source/http_source.rs b/src/storage-operators/src/oneshot_source/http_source.rs index bfbc1236f70c9..b68ae2ce491e6 100644 --- a/src/storage-operators/src/oneshot_source/http_source.rs +++ b/src/storage-operators/src/oneshot_source/http_source.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use crate::oneshot_source::{ - Encoding, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, + Encoding, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, StorageErrorXKind, }; /// Generic oneshot source that fetches a file from a URL on the public internet. @@ -43,6 +43,8 @@ pub struct HttpObject { url: Url, /// Name of the file. filename: String, + /// Size of this file reported by the [`Content-Length`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length) header + size: usize, /// Any values reporting from the [`Content-Encoding`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding) header. content_encoding: Vec, } @@ -52,6 +54,10 @@ impl OneshotObject for HttpObject { &self.filename } + fn size(&self) -> usize { + self.size + } + fn encodings(&self) -> &[Encoding] { &self.content_encoding } @@ -103,6 +109,15 @@ impl OneshotSource for HttpOneshotSource { HttpChecksum::None }; + // Get the size of the object from the Conent-Length header. + let size = get_header(&reqwest::header::CONTENT_LENGTH) + .ok_or_else(|| StorageErrorXKind::MissingSize) + .and_then(|s| { + s.parse::() + .map_err(|e| StorageErrorXKind::generic(e)) + }) + .context("content-length header")?; + // TODO(cf1): We should probably check the content-type as well. At least for advisory purposes. let filename = self @@ -114,6 +129,7 @@ impl OneshotSource for HttpOneshotSource { let object = HttpObject { url: self.origin.clone(), filename, + size, content_encoding: vec![], }; tracing::info!(?object, "found objects"); @@ -125,13 +141,23 @@ impl OneshotSource for HttpOneshotSource { &'s self, object: Self::Object, _checksum: Self::Checksum, - _range: Option>, + range: Option>, ) -> BoxStream<'s, Result> { - // TODO(cf1): Support the range param. // TODO(cf1): Validate our checksum. let initial_response = async move { - let response = self.client.get(object.url).send().await.context("get")?; + let mut request = self.client.get(object.url); + + if let Some(range) = &range { + let value = format!("bytes={}-{}", range.start(), range.end()); + request = request.header(&reqwest::header::RANGE, value); + } + + // TODO(parkmycar): We should probably assert that the response contains + // an appropriate Content-Range header in the response, and maybe that we + // got back an HTTP 206? + + let response = request.send().await.context("get")?; let bytes_stream = response.bytes_stream().err_into(); Ok::<_, StorageErrorX>(bytes_stream) diff --git a/src/storage-operators/src/oneshot_source/parquet.rs b/src/storage-operators/src/oneshot_source/parquet.rs new file mode 100644 index 0000000000000..06f581bf064c1 --- /dev/null +++ b/src/storage-operators/src/oneshot_source/parquet.rs @@ -0,0 +1,283 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Parquet [`OneshotFormat`]. + +use std::fmt; +use std::sync::Arc; + +use arrow::array::{Array, RecordBatch, StructArray}; +use bytes::{Bytes, BytesMut}; +use futures::future::BoxFuture; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use mz_persist_types::arrow::ProtoArrayData; +use mz_proto::{ProtoType, RustType}; +use mz_repr::RelationDesc; +use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::errors::ParquetError; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use prost::Message; +use serde::de::Visitor; +use serde::{Deserialize, Deserializer, Serialize}; +use smallvec::{smallvec, SmallVec}; + +use crate::oneshot_source::{ + OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, + StorageErrorXKind, +}; + +#[derive(Debug, Clone)] +pub struct ParquetFormat { + desc: RelationDesc, +} + +impl ParquetFormat { + pub fn new(desc: RelationDesc) -> Self { + ParquetFormat { desc } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ParquetWorkRequest { + object: O, + checksum: C, + row_groups: SmallVec<[usize; 1]>, +} + +#[derive(Clone, Debug)] +pub struct ParquetRowGroup { + record_batch: RecordBatch, +} + +impl OneshotFormat for ParquetFormat { + type WorkRequest + = ParquetWorkRequest + where + S: OneshotSource; + type RecordChunk = ParquetRowGroup; + + async fn split_work( + &self, + source: S, + object: S::Object, + checksum: S::Checksum, + ) -> Result>, StorageErrorX> { + let mut adapter = ParquetReaderAdapter::new(source, object.clone(), checksum.clone()); + let parquet_metadata = adapter.get_metadata().await?; + + tracing::info!( + object = object.name(), + row_groups = parquet_metadata.num_row_groups(), + "splitting Parquet object" + ); + + // Split up the file by the number of RowGroups. + // + // TODO(cf3): Support splitting up large RowGroups. + let work = (0..parquet_metadata.num_row_groups()) + .map(|row_group| ParquetWorkRequest { + object: object.clone(), + checksum: checksum.clone(), + row_groups: smallvec![row_group], + }) + .collect(); + + Ok(work) + } + + fn fetch_work<'a, S: OneshotSource + Sync + 'static>( + &'a self, + source: &'a S, + request: Self::WorkRequest, + ) -> BoxStream<'a, Result> { + let ParquetWorkRequest { + object, + checksum, + row_groups, + } = request; + + let adapter = ParquetReaderAdapter::new(source.clone(), object.clone(), checksum.clone()); + + let initial_work = async move { + ParquetRecordBatchStreamBuilder::new(adapter) + .await? + .with_row_groups(row_groups.to_vec()) + .build() + }; + + futures::stream::once(initial_work) + .try_flatten() + .map_ok(|record_batch| ParquetRowGroup { record_batch }) + .err_into() + .boxed() + } + + fn decode_chunk( + &self, + chunk: Self::RecordChunk, + rows: &mut Vec, + ) -> Result { + let ParquetRowGroup { record_batch } = chunk; + + let struct_array = StructArray::from(record_batch); + let reader = mz_arrow_util::reader::ArrowReader::new(&self.desc, struct_array) + .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into())) + .context("reader")?; + let rows_read = reader + .read_all(rows) + .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into())) + .context("read_all")?; + + Ok(rows_read) + } +} + +/// A newtype wrapper around a [`OneshotSource`] that allows us to implement +/// [`AsyncFileReader`] and [`MetadataFetch`] for all types that implement +/// [`OneshotSource`]. +#[derive(Clone)] +struct ParquetReaderAdapter { + source: S, + object: S::Object, + checksum: S::Checksum, +} + +impl fmt::Debug for ParquetReaderAdapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ObjectStoreAdapter").finish() + } +} + +impl ParquetReaderAdapter { + fn new(source: S, object: S::Object, checksum: S::Checksum) -> Self { + ParquetReaderAdapter { + source, + object, + checksum, + } + } +} + +impl MetadataFetch for ParquetReaderAdapter { + fn fetch( + &mut self, + range: std::ops::Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + let inclusive_end = std::cmp::max(range.start, range.end.saturating_sub(1)); + + Box::pin(async move { + // Fetch the specified range. + let result: Result, _> = self + .source + .get( + self.object.clone(), + self.checksum.clone(), + Some(range.start..=inclusive_end), + ) + .try_collect() + .await; + let bytes = match result { + Err(e) => return Err(ParquetError::General(e.to_string())), + Ok(bytes) => bytes, + }; + + // Join the stream into a single chunk. + let total_length = inclusive_end.saturating_sub(range.start); + let mut joined_bytes = BytesMut::with_capacity(total_length); + joined_bytes.extend(bytes); + + Ok(joined_bytes.freeze()) + }) + } +} + +impl AsyncFileReader for ParquetReaderAdapter { + fn get_bytes( + &mut self, + range: std::ops::Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + MetadataFetch::fetch(self, range) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { + let mut reader = ParquetMetaDataReader::new(); + let object_size = self.object.size(); + reader.try_load(self, object_size).await?; + reader.finish().map(|metadata| Arc::new(metadata)) + }) + } +} + +// Note(parkmycar): Instead of a manual implementation of Serialize and Deserialize we could +// change `ParquetRowGroup` to have a type which we can derive the impl for. But no types from the +// `arrow` crate do, and we'd prefer not to use a Vec since serialization is only required when +// Timely workers span multiple processes. +impl Serialize for ParquetRowGroup { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + // Note: This implementation isn't very efficient, but it should only be rarely used so + // it's not too much of a concern. + let struct_array = StructArray::from(self.record_batch.clone()); + let proto_array: ProtoArrayData = struct_array.into_data().into_proto(); + let encoded_proto = proto_array.encode_to_vec(); + encoded_proto.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for ParquetRowGroup { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + fn struct_array<'de: 'a, 'a, D: Deserializer<'de>>( + deserializer: D, + ) -> Result { + struct StructArrayVisitor; + + impl<'a> Visitor<'a> for StructArrayVisitor { + type Value = StructArray; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("binary data") + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + let serde_err = + || serde::de::Error::invalid_value(serde::de::Unexpected::Bytes(v), &self); + + let array_data = ProtoArrayData::decode(v) + .map_err(|_| serde_err()) + .and_then(|proto_array| proto_array.into_rust().map_err(|_| serde_err()))?; + let array_ref = arrow::array::make_array(array_data); + let struct_array = array_ref + .as_any() + .downcast_ref::() + .ok_or_else(|| serde_err())?; + + Ok(struct_array.clone()) + } + } + + deserializer.deserialize_bytes(StructArrayVisitor) + } + + let struct_array = struct_array(deserializer)?; + let record_batch = RecordBatch::from(struct_array); + + Ok(ParquetRowGroup { record_batch }) + } +} diff --git a/src/storage-types/src/oneshot_sources.proto b/src/storage-types/src/oneshot_sources.proto index eb88a65a2ccf4..1ff23ff0f8888 100644 --- a/src/storage-types/src/oneshot_sources.proto +++ b/src/storage-types/src/oneshot_sources.proto @@ -24,6 +24,7 @@ message ProtoOneshotIngestionRequest { oneof format { ProtoCsvContentFormat csv = 2; + ProtoParquetContentFormat parquet = 3; } oneof filter { @@ -33,9 +34,7 @@ message ProtoOneshotIngestionRequest { } } -message ProtoHttpContentSource { - string url = 1; -} +message ProtoHttpContentSource { string url = 1; } message ProtoAwsS3Source { mz_storage_types.connections.aws.ProtoAwsConnection connection = 1; @@ -47,6 +46,8 @@ message ProtoCsvContentFormat { mz_pgcopy.copy.ProtoCopyCsvFormatParams params = 1; } +message ProtoParquetContentFormat {} + message ProtoFilterFiles { repeated string files = 1; } diff --git a/src/storage-types/src/oneshot_sources.rs b/src/storage-types/src/oneshot_sources.rs index 5e679081269ee..3c828dc5f67ad 100644 --- a/src/storage-types/src/oneshot_sources.rs +++ b/src/storage-types/src/oneshot_sources.rs @@ -131,6 +131,7 @@ impl RustType for ContentSource { #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub enum ContentFormat { Csv(CopyCsvFormatParams<'static>), + Parquet, } impl RustType for ContentFormat { @@ -141,6 +142,9 @@ impl RustType for ContentFormat { params: Some(params.into_proto()), }) } + ContentFormat::Parquet => proto_oneshot_ingestion_request::Format::Parquet( + ProtoParquetContentFormat::default(), + ), } } @@ -152,6 +156,9 @@ impl RustType for ContentFormat { let params = params.into_rust_if_some("ProtoCsvContentFormat::params")?; Ok(ContentFormat::Csv(params)) } + proto_oneshot_ingestion_request::Format::Parquet(ProtoParquetContentFormat {}) => { + Ok(ContentFormat::Parquet) + } } } }