Skip to content

Commit

Permalink
fix: Fix grouping key reordering during spilling (#12395)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #12395

When prefix sort is enabled, we sort the grouping keys to maximize the prefixsort benefit as introduced in #11720.

However, when spilling happens and when reading the spilled data, there are key order mismatch between the spilled data and the operator output. It can cause segmentation fault when there is RowType mismatch or other type mismatch failures.

This PR fixes by adding a spill data loader (`spillResultWitoutAggregates_`) which has the reordered grouping keys, loading the spilled data, and mapping the keys back to result after loading.

Reviewed By: xiaoxmeng

Differential Revision: D69860326

fbshipit-source-id: ca1aa35cfe08124709a72cf4eeddb0b2b3731102
  • Loading branch information
zation99 authored and facebook-github-bot committed Feb 21, 2025
1 parent 9468981 commit 4adec18
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 17 deletions.
47 changes: 45 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,46 @@ bool GroupingSet::mergeNextWithAggregates(
VELOX_UNREACHABLE();
}

void GroupingSet::prepareSpillResultWithoutAggregates(
int32_t maxOutputRows,
const RowVectorPtr& result) {
const auto numColumns = result->type()->size();
if (spillResultWithoutAggregates_ == nullptr) {
std::vector<std::string> names(numColumns);
VELOX_CHECK_EQ(table_->rows()->keyTypes().size(), numColumns);
std::vector<TypePtr> types{table_->rows()->keyTypes()};

const auto& resultType = dynamic_cast<const RowType*>(result->type().get());
for (auto i = 0; i < numColumns; ++i) {
names[groupingKeyOutputProjections_[i]] = resultType->nameOf(i);
}
spillResultWithoutAggregates_ = BaseVector::create<RowVector>(
std::make_shared<RowType>(std::move(names), std::move(types)),
maxOutputRows,
&pool_);
} else {
VectorPtr spillResultWithoutAggregates =
std::move(spillResultWithoutAggregates_);
BaseVector::prepareForReuse(spillResultWithoutAggregates, maxOutputRows);
spillResultWithoutAggregates_ =
std::static_pointer_cast<RowVector>(spillResultWithoutAggregates);
}

VELOX_CHECK_NOT_NULL(spillResultWithoutAggregates_);
for (auto i = 0; i < numColumns; ++i) {
spillResultWithoutAggregates_->childAt(groupingKeyOutputProjections_[i]) =
std::move(result->childAt(i));
}
}

void GroupingSet::projectResult(const RowVectorPtr& result) {
for (auto i = 0; i < result->type()->size(); ++i) {
result->childAt(i) = std::move(spillResultWithoutAggregates_->childAt(
groupingKeyOutputProjections_[i]));
}
result->resize(spillResultWithoutAggregates_->size());
}

bool GroupingSet::mergeNextWithoutAggregates(
int32_t maxOutputRows,
const RowVectorPtr& result) {
Expand All @@ -1215,6 +1255,8 @@ bool GroupingSet::mergeNextWithoutAggregates(
// less than 'numDistinctSpillFilesPerPartition_'.
bool newDistinct{true};
int32_t numOutputRows{0};
prepareSpillResultWithoutAggregates(maxOutputRows, result);

while (numOutputRows < maxOutputRows) {
const auto next = merge_->nextWithEquals();
auto* stream = next.first;
Expand All @@ -1239,13 +1281,14 @@ bool GroupingSet::mergeNextWithoutAggregates(
}
if (newDistinct) {
// Yield result for new distinct.
result->copy(
spillResultWithoutAggregates_->copy(
&stream->current(), numOutputRows++, stream->currentIndex(), 1);
}
stream->pop();
newDistinct = true;
}
result->resize(numOutputRows);
spillResultWithoutAggregates_->resize(numOutputRows);
projectResult(result);
return numOutputRows > 0;
}

Expand Down
15 changes: 15 additions & 0 deletions velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@ class GroupingSet {
// index for this aggregation), otherwise it returns reference to activeRows_.
const SelectivityVector& getSelectivityVector(size_t aggregateIndex) const;

// Prepare spillResultWithoutAggregates_ for loading spilled data.
void prepareSpillResultWithoutAggregates(
int32_t maxOutputRows,
const RowVectorPtr& result);

// If prefixsort is enabled, loads the read data from
// spillResultWithoutAggregates_ into result.
void projectResult(const RowVectorPtr& result);

// Checks if input will fit in the existing memory and increases reservation
// if not. If reservation cannot be increased, spills enough to make 'input'
// fit.
Expand Down Expand Up @@ -336,6 +345,12 @@ class GroupingSet {
// First row in remainingInput_ that needs to be processed.
vector_size_t firstRemainingRow_;

// In case of distinct aggregation without aggregates and the grouping key
// reordered, the spilled data is first loaded into
// 'spillResultWithoutAggregates_' and then reordered back and load to
// result.
RowVectorPtr spillResultWithoutAggregates_{nullptr};

// The value of mayPushdown flag specified in addInput() for the
// 'remainingInput_'.
bool remainingMayPushdown_;
Expand Down
37 changes: 22 additions & 15 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,13 @@ TEST_F(AggregationTest, partialDistinctWithAbandon) {
}

TEST_F(AggregationTest, distinctWithGroupingKeysReordered) {
rowType_ = ROW(
{"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), VARCHAR(), VARCHAR()});
rowType_ =
ROW({"c0", "c1", "c2", "c3", "c4"},
{BIGINT(),
VARCHAR(),
INTEGER(),
ROW({"a0", "a1", "a2"}, {VARCHAR(), BOOLEAN(), BIGINT()}),
BOOLEAN()});

const int vectorSize = 2'000;
VectorFuzzer::Options options;
Expand All @@ -983,19 +988,21 @@ TEST_F(AggregationTest, distinctWithGroupingKeysReordered) {
// Distinct aggregation with grouping key with larger prefix encoded size
// first.
auto spillDirectory = exec::test::TempDirectoryPath::create();
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.config(QueryConfig::kAbandonPartialAggregationMinRows, 100)
.config(QueryConfig::kAbandonPartialAggregationMinPct, 50)
.spillDirectory(spillDirectory->getPath())
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kSpillPrefixSortEnabled, true)
.maxDrivers(1)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c2", "c0"}, {})
.planNode())
.assertResults("SELECT distinct c2, c0 FROM tmp");
TestScopedSpillInjection scopedSpillInjection(100);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.config(QueryConfig::kAbandonPartialAggregationMinRows, 100)
.config(QueryConfig::kAbandonPartialAggregationMinPct, 50)
.spillDirectory(spillDirectory->getPath())
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kSpillPrefixSortEnabled, true)
.maxDrivers(1)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c4", "c1", "c3", "c2", "c0"}, {})
.planNode())
.assertResults("SELECT distinct c4, c1, c3, c2, c0 FROM tmp");
}

TEST_F(AggregationTest, largeValueRangeArray) {
Expand Down

0 comments on commit 4adec18

Please sign in to comment.