-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add H2O.ai Database-like Ops benchmark to dfbench (groupby support) #13996
Conversation
ae80f69
to
34fd4d2
Compare
The PR testing result: ./benchmarks/bench.sh data h2o_small
***************************
DataFusion Benchmark Runner and Data Generator
COMMAND: data
BENCHMARK: h2o_small
DATA_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/data
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
Python version 3.9 found, but version 3.10 or higher is required.
Using Python command: python3.12
Installing falsa...
Generating h2o test data in /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o with size=SMALL and format=PARQUET
10000000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet
An output data schema is the following:
id1: string
id2: string
id3: string
id4: int64
id5: int64
id6: int64
v1: int64 not null
v2: int64 not null
v3: double not null
An output format is PARQUET
Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:04 Running example, we can both use /benchmarks/bench.sh run or cargo run: ./benchmarks/bench.sh run h2o_small
***************************
DataFusion Benchmark Script
COMMAND: run
BENCHMARK: h2o_small
DATAFUSION_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/..
BRANCH_NAME: issue_7209
DATA_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/data
RESULTS_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_7209
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
RESULTS_FILE: /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_7209/h2o.json
Running h2o benchmark...
Compiling datafusion-benchmarks v44.0.0 (/Users/zhuqi/arrow-datafusion/benchmarks)
Building [=======================> ] 337/338: dfbench(bin)
Finished `release` profile [optimized] target(s) in 4m 41s
Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench h2o --iterations 3 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/h2o/groupby.sql -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_7209/h2o.json`
Running benchmarks with the following options: RunOpt { query: None, common: CommonOpt { iterations: 3, partitions: None, batch_size: 8192, debug: false }, queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/h2o/groupby.sql", path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/issue_7209/h2o.json") }
Q1: SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1;
Query 1 iteration 1 took 58.3 ms and returned 100 rows
Query 1 iteration 2 took 18.8 ms and returned 100 rows
Query 1 iteration 3 took 19.1 ms and returned 100 rows
Q2: SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2;
Query 2 iteration 1 took 196.0 ms and returned 6321413 rows
Query 2 iteration 2 took 148.5 ms and returned 6321413 rows
Query 2 iteration 3 took 142.1 ms and returned 6321413 rows
Q3: SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3;
Query 3 iteration 1 took 113.4 ms and returned 100000 rows
Query 3 iteration 2 took 113.1 ms and returned 100000 rows
Query 3 iteration 3 took 107.0 ms and returned 100000 rows
Q4: SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4;
Query 4 iteration 1 took 28.0 ms and returned 100 rows
Query 4 iteration 2 took 41.5 ms and returned 100 rows
Query 4 iteration 3 took 44.1 ms and returned 100 rows
Q5: SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6;
Query 5 iteration 1 took 64.1 ms and returned 100000 rows
Query 5 iteration 2 took 52.1 ms and returned 100000 rows
Query 5 iteration 3 took 50.0 ms and returned 100000 rows
Q6: SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5;
Query 6 iteration 1 took 225.0 ms and returned 10000 rows
Query 6 iteration 2 took 245.5 ms and returned 10000 rows
Query 6 iteration 3 took 224.8 ms and returned 10000 rows
Q7: SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3;
Query 7 iteration 1 took 111.0 ms and returned 100000 rows
Query 7 iteration 2 took 97.4 ms and returned 100000 rows
Query 7 iteration 3 took 95.1 ms and returned 100000 rows
Q8: SELECT id6, largest2_v3 FROM (SELECT id6, v3 AS largest2_v3, ROW_NUMBER() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 FROM x WHERE v3 IS NOT NULL) sub_query WHERE order_v3 <= 2;
Query 8 iteration 1 took 386.7 ms and returned 200000 rows
Query 8 iteration 2 took 309.7 ms and returned 200000 rows
Query 8 iteration 3 took 301.9 ms and returned 200000 rows
Q9: SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM x GROUP BY id2, id4;
Query 9 iteration 1 took 614.5 ms and returned 6320797 rows
Query 9 iteration 2 took 572.8 ms and returned 6320797 rows
Query 9 iteration 3 took 591.2 ms and returned 6320797 rows
Q10: SELECT id1, id2, id3, id4, id5, id6, SUM(v3) AS v3, COUNT(*) AS count FROM x GROUP BY id1, id2, id3, id4, id5, id6;
Query 10 iteration 1 took 492.9 ms and returned 10000000 rows
Query 10 iteration 2 took 332.5 ms and returned 10000000 rows
Query 10 iteration 3 took 375.3 ms and returned 10000000 rows
Done cargo run --release --bin dfbench -- h2o --query 3 --debug
Finished `release` profile [optimized] target(s) in 0.22s
Running `target/release/dfbench h2o --query 3 --debug`
Running benchmarks with the following options: RunOpt { query: Some(3), common: CommonOpt { iterations: 3, partitions: None, batch_size: 8192, debug: true }, queries_path: "benchmarks/queries/h2o/groupby.sql", path: "benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet", output_path: None }
Q3: SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3;
Query 3 iteration 1 took 165.0 ms and returned 100000 rows
Query 3 iteration 2 took 112.6 ms and returned 100000 rows
Query 3 iteration 3 took 114.8 ms and returned 100000 rows
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: x.id3, sum(x.v1) AS v1, avg(x.v3) AS v3 |
| | Aggregate: groupBy=[[x.id3]], aggr=[[sum(x.v1), avg(x.v3)]] |
| | TableScan: x projection=[id3, v1, v3] |
| physical_plan | ProjectionExec: expr=[id3@0 as id3, sum(x.v1)@1 as v1, avg(x.v3)@2 as v3] |
| | AggregateExec: mode=FinalPartitioned, gby=[id3@0 as id3], aggr=[sum(x.v1), avg(x.v3)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([id3@0], 14), input_partitions=14 |
| | AggregateExec: mode=Partial, gby=[id3@0 as id3], aggr=[sum(x.v1), avg(x.v3)] |
| | ParquetExec: file_groups={14 groups: [[Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet:0..18252411], [Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet:18252411..36504822], [Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet:36504822..54757233], [Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet:54757233..73009644], [Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet:73009644..91262055], ...]}, projection=[id3, v1, v3] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, I have tried and there is an issue generating data, everything else looks good to me.
When I run ./bench.sh data h2o_medum
with python 3.13
...
error: the configured Python interpreter version (3.13) is newer than PyO3's maximum supported version (3.12)
= help: please check if an updated version of PyO3 is available. Current version: 0.20.3
= help: set PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1 to suppress this check and build anyway using the stable ABI
warning: build failed, waiting for other jobs to finish...
💥 maturin failed
...
The error showed up, I think falsa
does not support python 3.13.
Perhaps we can enforce [email protected] to suppress this issue now? In the future maybe we can use a docker image to generate h2o dataset instead.
long = "queries-path", | ||
default_value = "benchmarks/queries/h2o/groupby.sql" | ||
)] | ||
queries_path: PathBuf, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can remove this query path option? I think those queries should be static, and are unlikely to be placed elsewhere like large datasets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @2010YOUY01 for review, i kept it, because we will also support join.sql soon, so we can switch the path.
benchmarks/bench.sh
Outdated
@@ -80,6 +80,9 @@ clickbench_1: ClickBench queries against a single parquet file | |||
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet | |||
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) | |||
external_aggr: External aggregation benchmark | |||
h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is parquet | |||
h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is parquet | |||
h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benchmark results in https://duckdb.org/2023/04/14/h2oai.html is running on csv dataset, perhaps we can include a h2o_medium_csv
in this entry point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @2010YOUY01 for review, good suggestion, addressed in latest PR.
Thank you @2010YOUY01 for review, i fix the issue, now python 3.13 is also supported by testing: ./benchmarks/bench.sh data h2o_small
***************************
DataFusion Benchmark Runner and Data Generator
COMMAND: data
BENCHMARK: h2o_small
DATA_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/data
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
Found Python version 3.13, which is suitable.
Using Python command: /usr/local/bin/python3
Installing falsa...
Generating h2o test data in /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o with size=SMALL and format=PARQUET
10000000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.parquet
An output data schema is the following:
id1: string
id2: string
id3: string
id4: int64
id5: int64
id6: int64
v1: int64 not null
v2: int64 not null
v3: double not null
An output format is PARQUET
Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:04 And in future, we can change to docker support, it's a better solution i agree. |
Also @2010YOUY01 , updated, csv is supported now: ./benchmarks/bench.sh data h2o_small_csv
***************************
DataFusion Benchmark Runner and Data Generator
COMMAND: data
BENCHMARK: h2o_small_csv
DATA_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/data
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
Found Python version 3.13, which is suitable.
Using Python command: /usr/local/bin/python3
Installing falsa...
Generating h2o test data in /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o with size=SMALL and format=CSV
10000000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.csv
An output data schema is the following:
id1: string
id2: string
id3: string
id4: int64
id5: int64
id6: int64
v1: int64 not null
v2: int64 not null
v3: double not null
An output format is CSV
Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:04 ./benchmarks/bench.sh run h2o_small_csv
***************************
DataFusion Benchmark Script
COMMAND: run
BENCHMARK: h2o_small_csv
DATAFUSION_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/..
BRANCH_NAME: issue_7209
DATA_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/data
RESULTS_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_7209
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
RESULTS_FILE: /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_7209/h2o.json
Running h2o benchmark...
Finished `release` profile [optimized] target(s) in 0.30s
Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench h2o --iterations 3 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.csv --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/h2o/groupby.sql -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_7209/h2o.json`
Running benchmarks with the following options: RunOpt { query: None, common: CommonOpt { iterations: 3, partitions: None, batch_size: 8192, debug: false }, queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/h2o/groupby.sql", path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/G1_1e7_1e7_100_0.csv", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/issue_7209/h2o.json") }
Q1: SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1;
Query 1 iteration 1 took 131.4 ms and returned 100 rows
Query 1 iteration 2 took 111.8 ms and returned 100 rows
Query 1 iteration 3 took 108.0 ms and returned 100 rows
Q2: SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2;
Query 2 iteration 1 took 267.1 ms and returned 6321413 rows
Query 2 iteration 2 took 240.0 ms and returned 6321413 rows
Query 2 iteration 3 took 235.2 ms and returned 6321413 rows
Q3: SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3;
Query 3 iteration 1 took 187.3 ms and returned 100000 rows
Query 3 iteration 2 took 204.2 ms and returned 100000 rows
Query 3 iteration 3 took 218.2 ms and returned 100000 rows
Q4: SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4;
Query 4 iteration 1 took 145.2 ms and returned 100 rows
Query 4 iteration 2 took 144.7 ms and returned 100 rows
Query 4 iteration 3 took 128.9 ms and returned 100 rows
Q5: SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6;
Query 5 iteration 1 took 165.3 ms and returned 100000 rows
Query 5 iteration 2 took 161.1 ms and returned 100000 rows
Query 5 iteration 3 took 163.0 ms and returned 100000 rows
Q6: SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5;
Query 6 iteration 1 took 302.7 ms and returned 10000 rows
Query 6 iteration 2 took 299.9 ms and returned 10000 rows
Query 6 iteration 3 took 294.8 ms and returned 10000 rows
Q7: SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3;
Query 7 iteration 1 took 181.5 ms and returned 100000 rows
Query 7 iteration 2 took 171.4 ms and returned 100000 rows
Query 7 iteration 3 took 189.5 ms and returned 100000 rows
Q8: SELECT id6, largest2_v3 FROM (SELECT id6, v3 AS largest2_v3, ROW_NUMBER() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 FROM x WHERE v3 IS NOT NULL) sub_query WHERE order_v3 <= 2;
Query 8 iteration 1 took 382.6 ms and returned 200000 rows
Query 8 iteration 2 took 366.2 ms and returned 200000 rows
Query 8 iteration 3 took 361.9 ms and returned 200000 rows
Q9: SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM x GROUP BY id2, id4;
Query 9 iteration 1 took 685.0 ms and returned 6320797 rows
Query 9 iteration 2 took 711.7 ms and returned 6320797 rows
Query 9 iteration 3 took 725.4 ms and returned 6320797 rows
Q10: SELECT id1, id2, id3, id4, id5, id6, SUM(v3) AS v3, COUNT(*) AS count FROM x GROUP BY id1, id2, id3, id4, id5, id6;
Query 10 iteration 1 took 583.5 ms and returned 10000000 rows
Query 10 iteration 2 took 539.3 ms and returned 10000000 rows
Query 10 iteration 3 took 560.9 ms and returned 10000000 rows
Done |
Hi @alamb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas and @2010YOUY01
I tried this out locally and it worked really nicely. Thank you
I think the following follow on tasks would be valuable:
- Document this benchmark in https://github.com/apache/datafusion/tree/main/benchmarks#benchmarks
- Remove the old copy of the h2o benchmark in https://github.com/apache/datafusion/blob/main/benchmarks/src/bin/h2o.rs
I can try and help over the next day or two
fi | ||
|
||
# Search for suitable Python versions if the default is unsuitable | ||
if [ -z "$PYTHON_CMD" ]; then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is pretty fancy
I also think we maybe should also consider supporting fewer of these combinations (in follow on PRs) -- for example I am not sure how much value the parquet versions of the h2o tests are as the benchmark uses CSV (so that is what people care about about). We already have pretty good coverage for parquet in clickbench |
Thank you @alamb for review, i agree, addressed above comments in latest PR.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @zhuqi-lucas -- this is amazing! I am sorry for the delay in merging. I was waiting until I had time to file follow on tickets and it turns out you had already done it.
THANK YOU again! I am really excited to see this work get into DataFusion ❤️
Thank you @alamb ! |
Which issue does this PR close?
Closes #7209
Rationale for this change
This PR only supported groupby, join support will in another PR.
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?