Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Feb 19, 2025
1 parent bb9c90b commit 9e25d80
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
4 changes: 2 additions & 2 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ std::vector<column_index_t> getPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
std::vector<column_index_t> channels;

if (insertTableHandle->partitionKeys().size() > 0) {
auto partitionKeys = insertTableHandle->partitionKeys();
if (insertTableHandle->partitionKeyOrder().size() > 0) {
auto partitionKeys = insertTableHandle->partitionKeyOrder();
for (auto partitionKeyName : partitionKeys) {
for (column_index_t j = 0; j < insertTableHandle->inputColumns().size();
j++) {
Expand Down
28 changes: 23 additions & 5 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
// if there's no data. This is useful when the table is bucketed, but the
// engine handles ensuring a 1 to 1 mapping from task to bucket.
const bool ensureFiles = false,
const std::vector<std::string> partitionKeys = {})
const std::vector<std::string> partitionKeyOrder = {})
: inputColumns_(std::move(inputColumns)),
locationHandle_(std::move(locationHandle)),
storageFormat_(storageFormat),
Expand All @@ -219,7 +219,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
serdeParameters_(serdeParameters),
writerOptions_(writerOptions),
ensureFiles_(ensureFiles),
partitionKeys_(partitionKeys) {
partitionKeyOrder_(partitionKeyOrder) {
if (compressionKind.has_value()) {
VELOX_CHECK(
compressionKind.value() != common::CompressionKind_MAX,
Expand All @@ -240,6 +240,21 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
"ensureFiles is not supported with partition keys in the data");
}
}

if (partitionKeyOrder_.size() > 0) {
// Ensure the partitionKeyOrder contains all the partition keys in
// inputColumns_.
std::string partitionKeyNames;
for (const auto& inputColumn : inputColumns_) {
if (inputColumn->isPartitionKey()) {
partitionKeyNames.emplace_back(inputColumn->name());
}
}

VELOX_CHECK(
partitionKeyNames.size() == partitionKeyOrder_.size(),
"partition key order size is not equal with the partition column size in inputColumns_");
}
}

virtual ~HiveInsertTableHandle() = default;
Expand Down Expand Up @@ -273,8 +288,11 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
return ensureFiles_;
}

const std::vector<std::string>& partitionKeys() const {
return partitionKeys_;
/// If partitionKeyOrder is specified, the partition directory will be
/// created according on this order. Otherwise, it will follow the order of
/// columns in inputColumns_.
const std::vector<std::string>& partitionKeyOrder() const {
return partitionKeyOrder_;
}

bool supportsMultiThreading() const override {
Expand Down Expand Up @@ -306,7 +324,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
const std::unordered_map<std::string, std::string> serdeParameters_;
const std::shared_ptr<dwio::common::WriterOptions> writerOptions_;
const bool ensureFiles_;
const std::vector<std::string> partitionKeys_;
const std::vector<std::string> partitionKeyOrder_;
};

/// Parameters for Hive writers.
Expand Down

0 comments on commit 9e25d80

Please sign in to comment.