Skip to content

Commit

Permalink
start, support Parquet for COPY FROM
Browse files Browse the repository at this point in the history
* Add a new Parquet OneshotFormat
* Fix ranged requests for HTTP and AWS sources
  • Loading branch information
ParkMyCar committed Jan 24, 2025
1 parent 58f62b2 commit 3d8c1bf
Show file tree
Hide file tree
Showing 14 changed files with 428 additions and 17 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl Coordinator {
CopyFormatParams::Csv(csv) => {
mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
}
CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL format")));
Expand Down
3 changes: 3 additions & 0 deletions src/pgcopy/src/copy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message ProtoCopyFormatParams {
ProtoCopyTextFormatParams text = 1;
ProtoCopyCsvFormatParams csv = 2;
google.protobuf.Empty binary = 3;
ProtoCopyParquetFormatParams parquet = 4;
}
}

Expand All @@ -33,3 +34,5 @@ message ProtoCopyCsvFormatParams {
bool header = 4;
string null = 5;
}

message ProtoCopyParquetFormatParams {}
17 changes: 17 additions & 0 deletions src/pgcopy/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ pub enum CopyFormatParams<'a> {
Text(CopyTextFormatParams<'a>),
Csv(CopyCsvFormatParams<'a>),
Binary,
Parquet,
}

impl RustType<ProtoCopyFormatParams> for CopyFormatParams<'static> {
Expand All @@ -446,6 +447,7 @@ impl RustType<ProtoCopyFormatParams> for CopyFormatParams<'static> {
Self::Text(f) => Kind::Text(f.into_proto()),
Self::Csv(f) => Kind::Csv(f.into_proto()),
Self::Binary => Kind::Binary(()),
Self::Parquet => Kind::Parquet(ProtoCopyParquetFormatParams::default()),
}),
}
}
Expand All @@ -456,6 +458,7 @@ impl RustType<ProtoCopyFormatParams> for CopyFormatParams<'static> {
Some(Kind::Text(f)) => Ok(Self::Text(f.into_rust()?)),
Some(Kind::Csv(f)) => Ok(Self::Csv(f.into_rust()?)),
Some(Kind::Binary(())) => Ok(Self::Binary),
Some(Kind::Parquet(ProtoCopyParquetFormatParams {})) => Ok(Self::Parquet),
None => Err(TryFromProtoError::missing_field(
"ProtoCopyFormatParams::kind",
)),
Expand All @@ -482,6 +485,7 @@ impl CopyFormatParams<'static> {
&CopyFormatParams::Text(_) => "txt",
&CopyFormatParams::Csv(_) => "csv",
&CopyFormatParams::Binary => "bin",
&CopyFormatParams::Parquet => "parquet",
}
}

Expand All @@ -490,6 +494,7 @@ impl CopyFormatParams<'static> {
CopyFormatParams::Text(_) => false,
CopyFormatParams::Csv(params) => params.header,
CopyFormatParams::Binary => false,
CopyFormatParams::Parquet => false,
}
}
}
Expand All @@ -507,6 +512,10 @@ pub fn decode_copy_format<'a>(
io::ErrorKind::Unsupported,
"cannot decode as binary format",
)),
CopyFormatParams::Parquet => {
// TODO(cf2): Support Parquet over STDIN.
return Err(io::Error::new(io::ErrorKind::Unsupported, "parquet format"));
}
}
}

Expand All @@ -521,6 +530,10 @@ pub fn encode_copy_format<'a>(
CopyFormatParams::Text(params) => encode_copy_row_text(params, row, typ, out),
CopyFormatParams::Csv(params) => encode_copy_row_csv(params, row, typ, out),
CopyFormatParams::Binary => encode_copy_row_binary(row, typ, out),
CopyFormatParams::Parquet => {
// TODO(cf2): Support Parquet over STDIN.
return Err(io::Error::new(io::ErrorKind::Unsupported, "parquet format"));
}
}
}

Expand All @@ -546,6 +559,10 @@ pub fn encode_copy_format_header<'a>(
]);
encode_copy_row_csv(params, &header_row, &typ, out)
}
CopyFormatParams::Parquet => {
// TODO(cf2): Support Parquet over STDIN.
return Err(io::Error::new(io::ErrorKind::Unsupported, "parquet format"));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/sql/src/plan/statement/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ fn plan_copy_from(
)
}
CopyFormat::Binary => bail_unsupported!("FORMAT BINARY"),
CopyFormat::Parquet => bail_unsupported!("FORMAT PARQUET"),
CopyFormat::Parquet => CopyFormatParams::Parquet,
};

let filter = match (options.files, options.pattern) {
Expand Down
3 changes: 3 additions & 0 deletions src/storage-operators/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rust_library(
"//src/persist-types:mz_persist_types",
"//src/pgcopy:mz_pgcopy",
"//src/pgrepr:mz_pgrepr",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/storage-types:mz_storage_types",
"//src/timely-util:mz_timely_util",
Expand Down Expand Up @@ -81,6 +82,7 @@ rust_test(
"//src/persist-types:mz_persist_types",
"//src/pgcopy:mz_pgcopy",
"//src/pgrepr:mz_pgrepr",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/storage-types:mz_storage_types",
"//src/timely-util:mz_timely_util",
Expand All @@ -104,6 +106,7 @@ rust_doc_test(
"//src/persist-types:mz_persist_types",
"//src/pgcopy:mz_pgcopy",
"//src/pgrepr:mz_pgrepr",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/storage-types:mz_storage_types",
"//src/timely-util:mz_timely_util",
Expand Down
4 changes: 3 additions & 1 deletion src/storage-operators/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ mz-persist-client = { path = "../persist-client" }
mz-persist-types = { path = "../persist-types" }
mz-pgcopy = { path = "../pgcopy" }
mz-pgrepr = { path = "../pgrepr" }
mz-proto = { path = "../proto" }
mz-repr = { path = "../repr" }
mz-storage-types = { path = "../storage-types" }
mz-timely-util = { path = "../timely-util" }
mz-txn-wal = { path = "../txn-wal" }
parquet = { version = "53.3.0", default-features = false, features = ["arrow", "snap"] }
parquet = { version = "53.3.0", default-features = false, features = ["arrow", "async", "snap"] }
prometheus = { version = "0.13.3", default-features = false }
proptest = { version = "1.6.0", default-features = false, features = ["std"] }
prost = "0.13.2"
reqwest = { version = "0.11.13", features = ["stream"] }
sentry = { version = "0.29.1" }
serde = { version = "1.0.152", features = ["derive"] }
Expand Down
72 changes: 66 additions & 6 deletions src/storage-operators/src/oneshot_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ use tracing::info;
use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};

pub mod csv;
pub mod parquet;

pub mod aws_source;
pub mod http_source;
Expand Down Expand Up @@ -162,6 +164,10 @@ where
let format = CsvDecoder::new(params, &collection_meta.relation_desc);
FormatKind::Csv(format)
}
ContentFormat::Parquet => {
let format = ParquetFormat::new(collection_meta.relation_desc.clone());
FormatKind::Parquet(format)
}
};

// Discover what objects are available to copy.
Expand Down Expand Up @@ -627,6 +633,9 @@ pub trait OneshotObject {
/// Name of the object, including any extensions.
fn name(&self) -> &str;

/// Size of this object in bytes.
fn size(&self) -> usize;

/// Encodings of the _entire_ object, if any.
///
/// Note: The object may internally use compression, e.g. a Parquet file
Expand All @@ -645,11 +654,18 @@ pub enum Encoding {
}

/// Defines a remote system that we can fetch data from for a "one time" ingestion.
pub trait OneshotSource: Clone + Send {
pub trait OneshotSource: Clone + Send + Unpin {
/// An individual unit within the source, e.g. a file.
type Object: OneshotObject + Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
type Object: OneshotObject
+ Debug
+ Clone
+ Send
+ Unpin
+ Serialize
+ DeserializeOwned
+ 'static;
/// Checksum for a [`Self::Object`].
type Checksum: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;

/// Returns all of the objects for this source.
fn list<'a>(
Expand Down Expand Up @@ -744,6 +760,13 @@ impl OneshotObject for ObjectKind {
}
}

fn size(&self) -> usize {
match self {
ObjectKind::Http(object) => object.size(),
ObjectKind::AwsS3(object) => object.size(),
}
}

fn encodings(&self) -> &[Encoding] {
match self {
ObjectKind::Http(object) => object.encodings(),
Expand Down Expand Up @@ -781,7 +804,7 @@ pub trait OneshotFormat: Clone {

/// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that
/// can later be decoded.
fn fetch_work<'a, S: OneshotSource + Sync>(
fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
&'a self,
source: &'a S,
request: Self::WorkRequest<S>,
Expand All @@ -801,9 +824,10 @@ pub trait OneshotFormat: Clone {
/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format
/// types.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) enum FormatKind {
Csv(CsvDecoder),
Parquet(ParquetFormat),
}

impl OneshotFormat for FormatKind {
Expand All @@ -830,10 +854,20 @@ impl OneshotFormat for FormatKind {
.collect();
Ok(work)
}
FormatKind::Parquet(parquet) => {
let work = parquet
.split_work(source, object, checksum)
.await
.context("parquet")?
.into_iter()
.map(RequestKind::Parquet)
.collect();
Ok(work)
}
}
}

fn fetch_work<'a, S: OneshotSource + Sync>(
fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
&'a self,
source: &'a S,
request: Self::WorkRequest<S>,
Expand All @@ -844,6 +878,15 @@ impl OneshotFormat for FormatKind {
.map_ok(RecordChunkKind::Csv)
.map(|result| result.context("csv"))
.boxed(),
(FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
.fetch_work(source, request)
.map_ok(RecordChunkKind::Parquet)
.map(|result| result.context("parquet"))
.boxed(),
(FormatKind::Parquet(_), RequestKind::Csv(_))
| (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
unreachable!("programming error, {self:?}")
}
}
}

Expand All @@ -856,18 +899,27 @@ impl OneshotFormat for FormatKind {
(FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
csv.decode_chunk(chunk, rows).context("csv")
}
(FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
parquet.decode_chunk(chunk, rows).context("parquet")
}
(FormatKind::Parquet(_), RecordChunkKind::Csv(_))
| (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
unreachable!("programming error, {self:?}")
}
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) enum RequestKind<O, C> {
Csv(CsvWorkRequest<O, C>),
Parquet(ParquetWorkRequest<O, C>),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) enum RecordChunkKind {
Csv(CsvRecord),
Parquet(ParquetRowGroup),
}

pub(crate) enum ObjectFilter {
Expand Down Expand Up @@ -926,6 +978,8 @@ impl fmt::Display for StorageErrorX {
pub enum StorageErrorXKind {
#[error("csv decoding error: {0}")]
CsvDecoding(Arc<str>),
#[error("parquet error: {0}")]
ParquetError(Arc<str>),
#[error("reqwest error: {0}")]
Reqwest(Arc<str>),
#[error("aws s3 request error: {0}")]
Expand Down Expand Up @@ -970,6 +1024,12 @@ impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
}
}

impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
fn from(err: ::parquet::errors::ParquetError) -> Self {
StorageErrorXKind::ParquetError(err.to_string().into())
}
}

impl StorageErrorXKind {
pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
StorageErrorX {
Expand Down
4 changes: 4 additions & 0 deletions src/storage-operators/src/oneshot_source/aws_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl OneshotObject for S3Object {
&self.name
}

fn size(&self) -> usize {
self.size
}

fn encodings(&self) -> &[super::Encoding] {
&[]
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage-operators/src/oneshot_source/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::oneshot_source::{
Encoding, OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXKind,
};

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct CsvDecoder {
/// Properties of the CSV Reader.
params: CopyCsvFormatParams<'static>,
Expand Down Expand Up @@ -106,7 +106,7 @@ impl OneshotFormat for CsvDecoder {
Ok(vec![request])
}

fn fetch_work<'a, S: OneshotSource + Sync>(
fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
&'a self,
source: &'a S,
request: Self::WorkRequest<S>,
Expand Down
Loading

0 comments on commit 3d8c1bf

Please sign in to comment.