Skip to content

Commit

Permalink
fix: Fix grouping key reordering during spilling
Browse files Browse the repository at this point in the history
Summary:
When prefix sort is enabled, we sort the grouping keys to maximize the prefixsort benefit as introduced in #11720.

However, when spilling happens and when reading the spilled data, there are key order mismatch between the spilled data and the operator output. It can cause segmentation fault when there is RowType mismatch or other type mismatch failures.

This PR fixes by adding a spillDataLoader which has the reordered grouping keys, loading the spilled data, and mapping the keys back to result after loading.

Differential Revision: D69860326
  • Loading branch information
zation99 authored and facebook-github-bot committed Feb 20, 2025
1 parent 82e00de commit 9306ed1
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 16 deletions.
53 changes: 52 additions & 1 deletion velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,54 @@ bool GroupingSet::mergeNextWithAggregates(
VELOX_UNREACHABLE();
}

void GroupingSet::prepareSpillDataLoad(
int32_t maxOutputRows,
const RowVectorPtr& result) {
if (!spillConfig_->prefixSortEnabled()) {
spillDataLoader_ = result;
return;
}

unsigned int size = result->type()->size();

if (spillDataLoader_ == nullptr) {
std::vector<std::string> names;
std::vector<std::shared_ptr<const Type>> types;
names.resize(size);
types.resize(size);
RowTypePtr resultType =
std::static_pointer_cast<const RowType>(result->type());
for (auto i = 0; i < size; ++i) {
names[groupingKeyOutputProjections_[i]] =
dynamic_cast<const RowType*>(result->type().get())->nameOf(i);
types[groupingKeyOutputProjections_[i]] = result->childAt(i)->type();
}
spillDataLoader_ = BaseVector::create<RowVector>(
std::make_shared<RowType>(std::move(names), std::move(types)),
maxOutputRows,
&pool_);
} else {
VectorPtr spillDataLoader = std::move(spillDataLoader_);
BaseVector::prepareForReuse(spillDataLoader, maxOutputRows);
spillDataLoader_ = std::static_pointer_cast<RowVector>(spillDataLoader);
}

for (auto i = 0; i < size; ++i) {
spillDataLoader_->childAt(groupingKeyOutputProjections_[i]) =
std::move(result->childAt(i));
}
}

void GroupingSet::restoreResult(const RowVectorPtr& result) {
if (!spillConfig_->prefixSortEnabled()) {
return;
}
for (auto i = 0; i < result->type()->size(); ++i) {
result->childAt(i) =
std::move(spillDataLoader_->childAt(groupingKeyOutputProjections_[i]));
}
}

bool GroupingSet::mergeNextWithoutAggregates(
int32_t maxOutputRows,
const RowVectorPtr& result) {
Expand All @@ -1215,6 +1263,8 @@ bool GroupingSet::mergeNextWithoutAggregates(
// less than 'numDistinctSpillFilesPerPartition_'.
bool newDistinct{true};
int32_t numOutputRows{0};
prepareSpillDataLoad(maxOutputRows, result);

while (numOutputRows < maxOutputRows) {
const auto next = merge_->nextWithEquals();
auto* stream = next.first;
Expand All @@ -1239,13 +1289,14 @@ bool GroupingSet::mergeNextWithoutAggregates(
}
if (newDistinct) {
// Yield result for new distinct.
result->copy(
spillDataLoader_->copy(
&stream->current(), numOutputRows++, stream->currentIndex(), 1);
}
stream->pop();
newDistinct = true;
}
result->resize(numOutputRows);
restoreResult(result);
return numOutputRows > 0;
}

Expand Down
11 changes: 11 additions & 0 deletions velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ class GroupingSet {
// index for this aggregation), otherwise it returns reference to activeRows_.
const SelectivityVector& getSelectivityVector(size_t aggregateIndex) const;

// Prepare spillDataLoader_ for loading spilled data.
void prepareSpillDataLoad(int32_t maxOutputRows, const RowVectorPtr& result);

// If prefixsort is enabled, loads the read data from spillDataLoader_ into
// result.
void restoreResult(const RowVectorPtr& result);

// Checks if input will fit in the existing memory and increases reservation
// if not. If reservation cannot be increased, spills enough to make 'input'
// fit.
Expand Down Expand Up @@ -336,6 +343,10 @@ class GroupingSet {
// First row in remainingInput_ that needs to be processed.
vector_size_t firstRemainingRow_;

// In case of grouping key reordering, spilled data is first loaded into
// 'spillDataLoader_', which is then reordered back and load to result.
RowVectorPtr spillDataLoader_{nullptr};

// The value of mayPushdown flag specified in addInput() for the
// 'remainingInput_'.
bool remainingMayPushdown_;
Expand Down
37 changes: 22 additions & 15 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,13 @@ TEST_F(AggregationTest, partialDistinctWithAbandon) {
}

TEST_F(AggregationTest, distinctWithGroupingKeysReordered) {
rowType_ = ROW(
{"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), VARCHAR(), VARCHAR()});
rowType_ =
ROW({"c0", "c1", "c2", "c3", "c4"},
{BIGINT(),
VARCHAR(),
INTEGER(),
ROW({"a0", "a1", "a2"}, {VARCHAR(), BOOLEAN(), BIGINT()}),
BOOLEAN()});

const int vectorSize = 2'000;
VectorFuzzer::Options options;
Expand All @@ -983,19 +988,21 @@ TEST_F(AggregationTest, distinctWithGroupingKeysReordered) {
// Distinct aggregation with grouping key with larger prefix encoded size
// first.
auto spillDirectory = exec::test::TempDirectoryPath::create();
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.config(QueryConfig::kAbandonPartialAggregationMinRows, 100)
.config(QueryConfig::kAbandonPartialAggregationMinPct, 50)
.spillDirectory(spillDirectory->getPath())
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kSpillPrefixSortEnabled, true)
.maxDrivers(1)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c2", "c0"}, {})
.planNode())
.assertResults("SELECT distinct c2, c0 FROM tmp");
TestScopedSpillInjection scopedSpillInjection(100);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.config(QueryConfig::kAbandonPartialAggregationMinRows, 100)
.config(QueryConfig::kAbandonPartialAggregationMinPct, 50)
.spillDirectory(spillDirectory->getPath())
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kSpillPrefixSortEnabled, true)
.maxDrivers(1)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c4", "c1", "c3", "c2", "c0"}, {})
.planNode())
.assertResults("SELECT distinct c4, c1, c3, c2, c0 FROM tmp");
}

TEST_F(AggregationTest, largeValueRangeArray) {
Expand Down

0 comments on commit 9306ed1

Please sign in to comment.