-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support to specify partition key order in TableWrite operator #12355
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for meta-velox canceled.
|
@majetideepak Can you help to review this PR? Thanks for your help. |
velox/connectors/hive/HiveDataSink.h
Outdated
@@ -300,6 +306,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { | |||
const std::unordered_map<std::string, std::string> serdeParameters_; | |||
const std::shared_ptr<dwio::common::WriterOptions> writerOptions_; | |||
const bool ensureFiles_; | |||
const std::vector<std::string> partitionKeys_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this used? I don't think Velox needs to be aware of these, it's not Velox who is creating the directories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yuhta Thanks for your review.
The HiveDataSink
generates partition directories based on the partition index derived from the input RowVector
. Currently, the partition index is constructed by traversing the RowVector
to determine if a column is partitioned. This approach can lead to a mismatch in partition key order if it differs from the order in the RowVector
. For instance, if the input RowVector
is (a, b, c)
and the partition key is set as (b, a), Velox will create directories as (a={}/b={})
based on the existing logic. This does not align with Spark's partition directory format, which would be (b={}/a={})
. To address this, we have introduced a partitionKey parameter in the HiveInsertTableHandle
. This allows us to generate the partition index according to the specified partitionKey order, ensuring alignment with the user's desired partition key sequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, can we rename this method to partitionKeyOrder()
? And comment that if specified, we use this order, otherwise we use the column order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yuhta Yes. Updated the method name to partitionKeyOrder()
. And also added the related comments.
velox/connectors/hive/HiveDataSink.h
Outdated
@@ -300,6 +306,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle { | |||
const std::unordered_map<std::string, std::string> serdeParameters_; | |||
const std::shared_ptr<dwio::common::WriterOptions> writerOptions_; | |||
const bool ensureFiles_; | |||
const std::vector<std::string> partitionKeys_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, can we rename this method to partitionKeyOrder()
? And comment that if specified, we use this order, otherwise we use the column order
@Yuhta I have resolved all your comments. Can you help to review again? Thanks. |
@JkSelf There are build failures. Can you take a look? |
velox/connectors/hive/HiveDataSink.h
Outdated
if (partitionKeyOrder_.size() > 0) { | ||
// Ensure the partitionKeyOrder contains all the partition keys in | ||
// inputColumns_. | ||
std::string partitionKeyNames; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to also check there is no repetition. Something like
folly::F14FastSet<std::string> partitionKeyNames(partitionKeyOrder_.begin(), partitionKeyOrder_.end());
//...
VELOX_CHECK(partitionKeyNames.erase(inputColumn->name()) == 1);
//...
VELOX_CHECK(partitionKeyNames.empty());
Also in Presto we have convention to put all partitioning columns at the beginning of the RowVector
in the right order. Is there anything in Spark preventing us from doing so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yuhta There is no such convention in Spark to put the partition columns at the beginning of the input columns.
@Yuhta @majetideepak Can you help to review again? Thanks. |
Fix apache/incubator-gluten#8663
The
HiveDataSink
generates partition directories based on the partition index derived from the inputRowVector
. Currently, the partition index is constructed by traversing theRowVector
to determine if a column is partitioned. This approach can lead to a mismatch in partition key order if it differs from the order in theRowVector
. For instance, if the inputRowVector
is(a, b, c)
and the partition key is set as(b, a)
, Velox will create directories as(a={}/b={})
based on the existing logic. This does not align with Spark's partition directory format, which would be(b={}/a={})
. To address this, we have introduced a partitionKey parameter in theHiveInsertTableHandle
. This allows us to generate the partition index according to the specified partitionKey order, ensuring alignment with the user's desired partition key sequence.