Skip to content

Commit

Permalink
Support empty relation write (#370)
Browse files Browse the repository at this point in the history
Co-authored-by: youxiduo <[email protected]>
  • Loading branch information
ulysses-you and ulysses-you authored Jul 25, 2023
1 parent 1bb2da3 commit ccb4075
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
35 changes: 27 additions & 8 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,26 @@ Writer::Writer(
stream_(std::make_shared<ArrowDataBufferSink>(
std::move(sink),
*generalPool_,
bufferGrowRatio_)),
schema_(schema) {
bufferGrowRatio_)) {
arrowContext_ = std::make_shared<ArrowContext>();
arrowContext_->properties = getArrowParquetWriterOptions(options);
arrowContext_->schema = schema;

if (arrowContext_->schema) {
// If the input iterator is empty, the writer will do nothing and build a
// empty file. We should at least write the parquet magic header so the
// reader can regonize it is a valid parquet file. So, we initialize the
// writer at first even there is no data.
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
PARQUET_ASSIGN_OR_THROW(
arrowContext_->writer,
::parquet::arrow::FileWriter::Open(
*arrowContext_->schema.get(),
arrow::default_memory_pool(),
stream_,
arrowContext_->properties,
arrowProperties));
}
}

Writer::Writer(
Expand Down Expand Up @@ -200,20 +216,23 @@ void Writer::flush() {
*/
void Writer::write(const VectorPtr& data) {
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, generalPool_.get());
exportToArrow(data, schema);
std::shared_ptr<arrow::RecordBatch> recordBatch;
if (schema_) {
if (arrowContext_->schema) {
PARQUET_ASSIGN_OR_THROW(
recordBatch, arrow::ImportRecordBatch(&array, schema_));
recordBatch, arrow::ImportRecordBatch(&array, arrowContext_->schema));
} else {
ArrowSchema schema;
exportToArrow(data, schema);
PARQUET_ASSIGN_OR_THROW(
recordBatch, arrow::ImportRecordBatch(&array, &schema));
}

if (!arrowContext_->schema) {
arrowContext_->schema = recordBatch->schema();
if (arrowContext_->stagingChunks.empty()) {
if (!arrowContext_->schema) {
arrowContext_->schema = recordBatch->schema();
}

for (int colIdx = 0; colIdx < arrowContext_->schema->num_fields();
colIdx++) {
arrowContext_->stagingChunks.push_back(
Expand Down
2 changes: 0 additions & 2 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ class Writer : public dwio::common::Writer {
std::shared_ptr<ArrowDataBufferSink> stream_;

std::shared_ptr<ArrowContext> arrowContext_;

std::shared_ptr<arrow::Schema> schema_;
};

class ParquetWriterFactory : public dwio::common::WriterFactory {
Expand Down

0 comments on commit ccb4075

Please sign in to comment.