Skip to content

Commit

Permalink
fix: Flakey memory pool already exists failure in TableScanReplayerTe…
Browse files Browse the repository at this point in the history
…st (#12370)

Summary:
The `OperatorReplayerBase` creates query context for each replay query,
in which it would create a memory pool, we should add a thread-safe
auto increase ID in it as CI would run unit tests concurrently, and trigger
the flakey failure.

Fixes #12360

Pull Request resolved: #12370

Reviewed By: tanjialiang

Differential Revision: D69798321

Pulled By: xiaoxmeng

fbshipit-source-id: 9eb6dcd80cb2b2fb8f1fc2cbe198cf2ebd86cae6
  • Loading branch information
duanmeng authored and facebook-github-bot committed Feb 19, 2025
1 parent 9442cef commit f4ca6e5
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
3 changes: 2 additions & 1 deletion velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() {
}

std::shared_ptr<core::QueryCtx> OperatorReplayerBase::createQueryCtx() {
static std::atomic_uint64_t replayQueryId{0};
auto queryPool = memory::memoryManager()->addRootPool(
fmt::format("{}_replayer_{}", operatorType_, replayQueryId_++),
fmt::format("{}_replayer_{}", operatorType_, replayQueryId++),
queryCapacity_);
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
connectorConfigs;
Expand Down
1 change: 0 additions & 1 deletion velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class OperatorReplayerBase {
connectorConfigs_;
core::PlanNodePtr planFragment_;
core::PlanNodeId replayPlanNodeId_;
std::atomic_uint64_t replayQueryId_{0};

void printStats(const std::shared_ptr<exec::Task>& task) const;

Expand Down
48 changes: 48 additions & 0 deletions velox/tool/trace/tests/TableScanReplayerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,52 @@ TEST_F(TableScanReplayerTest, subfieldPrunning) {
.run();
assertEqualResults({results}, {replayingResult});
}

TEST_F(TableScanReplayerTest, concurrent) {
const auto vectors = makeVectors(2, 10);
const auto testDir = TempDirectoryPath::create();
const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot");
const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr);
std::vector<std::shared_ptr<TempFilePath>> splitFiles;
for (int i = 0; i < 2; ++i) {
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), vectors);
splitFiles.push_back(std::move(filePath));
}

const auto plan = tableScanNode();
std::shared_ptr<Task> task;
auto traceResult =
AssertQueryBuilder(plan)
.maxDrivers(4)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30)
.config(core::QueryConfig::kQueryTraceTaskRegExp, ".*")
.config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_)
.splits(makeHiveConnectorSplits(splitFiles))
.copyResults(pool(), task);

const auto taskId = task->taskId();
std::vector<std::thread> threads;
threads.reserve(8);
for (int i = 0; i < 8; ++i) {
threads.emplace_back([&]() {
const auto replayingResult = TableScanReplayer(
traceRoot,
task->queryCtx()->queryId(),
task->taskId(),
traceNodeId_,
"TableScan",
"",
0,
executor_.get())
.run();
assertEqualResults({traceResult}, {replayingResult});
});
}
for (auto& thread : threads) {
thread.join();
}
}
} // namespace facebook::velox::tool::trace::test

0 comments on commit f4ca6e5

Please sign in to comment.