From daf182dc789230dbd9cf21ca2e975789213a5365 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 3 Apr 2024 09:09:20 -0400 Subject: [PATCH 1/2] Add TPCH-DS planning benchmark (#9907) * Move TPCH schema definition * Move schema definitions into test_util * Add tpcds planning benchmark --- datafusion/core/benches/sql_planner.rs | 167 +++---- datafusion/core/tests/tpcds_planning.rs | 578 +---------------------- test-utils/src/lib.rs | 18 + test-utils/src/tpcds.rs | 579 ++++++++++++++++++++++++ test-utils/src/tpch.rs | 118 +++++ 5 files changed, 774 insertions(+), 686 deletions(-) create mode 100644 test-utils/src/tpcds.rs create mode 100644 test-utils/src/tpch.rs diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 3f7d66f5cc15..3946b716afef 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -26,6 +26,9 @@ use arrow::datatypes::{DataType, Field, Fields, Schema}; use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; use std::sync::Arc; +use test_utils::tpcds::tpcds_schemas; +use test_utils::tpch::tpch_schemas; +use test_utils::TableDef; use tokio::runtime::Runtime; /// Create a logical plan from the specified sql @@ -48,116 +51,18 @@ fn physical_plan(ctx: &SessionContext, sql: &str) { } /// Create schema with the specified number of columns -pub fn create_schema(column_prefix: &str, num_columns: usize) -> Schema { +fn create_schema(column_prefix: &str, num_columns: usize) -> Schema { let fields: Fields = (0..num_columns) .map(|i| Field::new(format!("{column_prefix}{i}"), DataType::Int32, true)) .collect(); Schema::new(fields) } -pub fn create_table_provider(column_prefix: &str, num_columns: usize) -> Arc { +fn create_table_provider(column_prefix: &str, num_columns: usize) -> Arc { let schema = Arc::new(create_schema(column_prefix, num_columns)); MemTable::try_new(schema, vec![]).map(Arc::new).unwrap() } -pub fn create_tpch_schemas() -> [(String, Schema); 8] { - let lineitem_schema = Schema::new(vec![ - Field::new("l_orderkey", DataType::Int64, false), - Field::new("l_partkey", DataType::Int64, false), - Field::new("l_suppkey", DataType::Int64, false), - Field::new("l_linenumber", DataType::Int32, false), - Field::new("l_quantity", DataType::Decimal128(15, 2), false), - Field::new("l_extendedprice", DataType::Decimal128(15, 2), false), - Field::new("l_discount", DataType::Decimal128(15, 2), false), - Field::new("l_tax", DataType::Decimal128(15, 2), false), - Field::new("l_returnflag", DataType::Utf8, false), - Field::new("l_linestatus", DataType::Utf8, false), - Field::new("l_shipdate", DataType::Date32, false), - Field::new("l_commitdate", DataType::Date32, false), - Field::new("l_receiptdate", DataType::Date32, false), - Field::new("l_shipinstruct", DataType::Utf8, false), - Field::new("l_shipmode", DataType::Utf8, false), - Field::new("l_comment", DataType::Utf8, false), - ]); - - let orders_schema = Schema::new(vec![ - Field::new("o_orderkey", DataType::Int64, false), - Field::new("o_custkey", DataType::Int64, false), - Field::new("o_orderstatus", DataType::Utf8, false), - Field::new("o_totalprice", DataType::Decimal128(15, 2), false), - Field::new("o_orderdate", DataType::Date32, false), - Field::new("o_orderpriority", DataType::Utf8, false), - Field::new("o_clerk", DataType::Utf8, false), - Field::new("o_shippriority", DataType::Int32, false), - Field::new("o_comment", DataType::Utf8, false), - ]); - - let part_schema = Schema::new(vec![ - Field::new("p_partkey", DataType::Int64, false), - Field::new("p_name", DataType::Utf8, false), - Field::new("p_mfgr", DataType::Utf8, false), - Field::new("p_brand", DataType::Utf8, false), - Field::new("p_type", DataType::Utf8, false), - Field::new("p_size", DataType::Int32, false), - Field::new("p_container", DataType::Utf8, false), - Field::new("p_retailprice", DataType::Decimal128(15, 2), false), - Field::new("p_comment", DataType::Utf8, false), - ]); - - let supplier_schema = Schema::new(vec![ - Field::new("s_suppkey", DataType::Int64, false), - Field::new("s_name", DataType::Utf8, false), - Field::new("s_address", DataType::Utf8, false), - Field::new("s_nationkey", DataType::Int64, false), - Field::new("s_phone", DataType::Utf8, false), - Field::new("s_acctbal", DataType::Decimal128(15, 2), false), - Field::new("s_comment", DataType::Utf8, false), - ]); - - let partsupp_schema = Schema::new(vec![ - Field::new("ps_partkey", DataType::Int64, false), - Field::new("ps_suppkey", DataType::Int64, false), - Field::new("ps_availqty", DataType::Int32, false), - Field::new("ps_supplycost", DataType::Decimal128(15, 2), false), - Field::new("ps_comment", DataType::Utf8, false), - ]); - - let customer_schema = Schema::new(vec![ - Field::new("c_custkey", DataType::Int64, false), - Field::new("c_name", DataType::Utf8, false), - Field::new("c_address", DataType::Utf8, false), - Field::new("c_nationkey", DataType::Int64, false), - Field::new("c_phone", DataType::Utf8, false), - Field::new("c_acctbal", DataType::Decimal128(15, 2), false), - Field::new("c_mktsegment", DataType::Utf8, false), - Field::new("c_comment", DataType::Utf8, false), - ]); - - let nation_schema = Schema::new(vec![ - Field::new("n_nationkey", DataType::Int64, false), - Field::new("n_name", DataType::Utf8, false), - Field::new("n_regionkey", DataType::Int64, false), - Field::new("n_comment", DataType::Utf8, false), - ]); - - let region_schema = Schema::new(vec![ - Field::new("r_regionkey", DataType::Int64, false), - Field::new("r_name", DataType::Utf8, false), - Field::new("r_comment", DataType::Utf8, false), - ]); - - [ - ("lineitem".to_string(), lineitem_schema), - ("orders".to_string(), orders_schema), - ("part".to_string(), part_schema), - ("supplier".to_string(), supplier_schema), - ("partsupp".to_string(), partsupp_schema), - ("customer".to_string(), customer_schema), - ("nation".to_string(), nation_schema), - ("region".to_string(), region_schema), - ] -} - fn create_context() -> SessionContext { let ctx = SessionContext::new(); ctx.register_table("t1", create_table_provider("a", 200)) @@ -168,16 +73,19 @@ fn create_context() -> SessionContext { .unwrap(); ctx.register_table("t1000", create_table_provider("d", 1000)) .unwrap(); + ctx +} - let tpch_schemas = create_tpch_schemas(); - tpch_schemas.iter().for_each(|(name, schema)| { +/// Register the table definitions as a MemTable with the context and return the +/// context +fn register_defs(ctx: SessionContext, defs: Vec) -> SessionContext { + defs.iter().for_each(|TableDef { name, schema }| { ctx.register_table( name, Arc::new(MemTable::try_new(Arc::new(schema.clone()), vec![]).unwrap()), ) .unwrap(); }); - ctx } @@ -236,6 +144,10 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); + // --- TPC-H --- + + let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas()); + let tpch_queries = [ "q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10", "q11", "q12", "q13", "q14", // "q15", q15 has multiple SQL statements which is not supported @@ -243,25 +155,24 @@ fn criterion_benchmark(c: &mut Criterion) { ]; for q in tpch_queries { - let sql = std::fs::read_to_string(format!("../../benchmarks/queries/{}.sql", q)) - .unwrap(); + let sql = + std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap(); c.bench_function(&format!("physical_plan_tpch_{}", q), |b| { - b.iter(|| physical_plan(&ctx, &sql)) + b.iter(|| physical_plan(&tpch_ctx, &sql)) }); } let all_tpch_sql_queries = tpch_queries .iter() .map(|q| { - std::fs::read_to_string(format!("../../benchmarks/queries/{}.sql", q)) - .unwrap() + std::fs::read_to_string(format!("../../benchmarks/queries/{q}.sql")).unwrap() }) .collect::>(); c.bench_function("physical_plan_tpch_all", |b| { b.iter(|| { for sql in &all_tpch_sql_queries { - physical_plan(&ctx, sql) + physical_plan(&tpch_ctx, sql) } }) }); @@ -269,7 +180,43 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("logical_plan_tpch_all", |b| { b.iter(|| { for sql in &all_tpch_sql_queries { - logical_plan(&ctx, sql) + logical_plan(&tpch_ctx, sql) + } + }) + }); + + // --- TPC-DS --- + + let tpcds_ctx = register_defs(SessionContext::new(), tpcds_schemas()); + + // 10, 35: Physical plan does not support logical expression Exists() + // 45: Physical plan does not support logical expression () + // 41: Optimizing disjunctions not supported + let ignored = [10, 35, 41, 45]; + + let raw_tpcds_sql_queries = (1..100) + .filter(|q| !ignored.contains(q)) + .map(|q| std::fs::read_to_string(format!("./tests/tpc-ds/{q}.sql")).unwrap()) + .collect::>(); + + // some queries have multiple statements + let all_tpcds_sql_queries = raw_tpcds_sql_queries + .iter() + .flat_map(|sql| sql.split(';').filter(|s| !s.trim().is_empty())) + .collect::>(); + + c.bench_function("physical_plan_tpcds_all", |b| { + b.iter(|| { + for sql in &all_tpcds_sql_queries { + physical_plan(&tpcds_ctx, sql) + } + }) + }); + + c.bench_function("logical_plan_tpcds_all", |b| { + b.iter(|| { + for sql in &all_tpcds_sql_queries { + logical_plan(&tpcds_ctx, sql) } }) }); diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index e8d2c3764e0c..a4a85c6bd1f2 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -21,12 +21,12 @@ // Schema and queries used in these tests are copyright // 2015 Transaction Processing Performance Council -use arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::Result; use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; use std::fs; use std::sync::Arc; +use test_utils::tpcds::tpcds_schemas; #[tokio::test] async fn tpcds_logical_q1() -> Result<()> { @@ -1041,7 +1041,7 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> { let config = SessionConfig::default(); let ctx = SessionContext::new_with_config(config); - let tables = get_table_definitions(); + let tables = tpcds_schemas(); for table in &tables { ctx.register_table( table.name.as_str(), @@ -1066,577 +1066,3 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> { Ok(()) } - -pub struct TableDef { - pub name: String, - pub schema: Schema, -} - -impl TableDef { - fn new(name: &str, schema: Schema) -> Self { - Self { - name: name.to_string(), - schema, - } - } -} - -pub fn get_table_definitions() -> Vec { - vec![ - TableDef::new( - "catalog_sales", - Schema::new(vec![ - Field::new("cs_sold_date_sk", DataType::Int32, false), - Field::new("cs_sold_time_sk", DataType::Int32, false), - Field::new("cs_ship_date_sk", DataType::Int32, false), - Field::new("cs_bill_customer_sk", DataType::Int32, false), - Field::new("cs_bill_cdemo_sk", DataType::Int32, false), - Field::new("cs_bill_hdemo_sk", DataType::Int32, false), - Field::new("cs_bill_addr_sk", DataType::Int32, false), - Field::new("cs_ship_customer_sk", DataType::Int32, false), - Field::new("cs_ship_cdemo_sk", DataType::Int32, false), - Field::new("cs_ship_hdemo_sk", DataType::Int32, false), - Field::new("cs_ship_addr_sk", DataType::Int32, false), - Field::new("cs_call_center_sk", DataType::Int32, false), - Field::new("cs_catalog_page_sk", DataType::Int32, false), - Field::new("cs_ship_mode_sk", DataType::Int32, false), - Field::new("cs_warehouse_sk", DataType::Int32, false), - Field::new("cs_item_sk", DataType::Int32, false), - Field::new("cs_promo_sk", DataType::Int32, false), - Field::new("cs_order_number", DataType::Int64, false), - Field::new("cs_quantity", DataType::Int32, false), - Field::new("cs_wholesale_cost", DataType::Decimal128(7, 2), false), - Field::new("cs_list_price", DataType::Decimal128(7, 2), false), - Field::new("cs_sales_price", DataType::Decimal128(7, 2), false), - Field::new("cs_ext_discount_amt", DataType::Decimal128(7, 2), false), - Field::new("cs_ext_sales_price", DataType::Decimal128(7, 2), false), - Field::new("cs_ext_wholesale_cost", DataType::Decimal128(7, 2), false), - Field::new("cs_ext_list_price", DataType::Decimal128(7, 2), false), - Field::new("cs_ext_tax", DataType::Decimal128(7, 2), false), - Field::new("cs_coupon_amt", DataType::Decimal128(7, 2), false), - Field::new("cs_ext_ship_cost", DataType::Decimal128(7, 2), false), - Field::new("cs_net_paid", DataType::Decimal128(7, 2), false), - Field::new("cs_net_paid_inc_tax", DataType::Decimal128(7, 2), false), - Field::new("cs_net_paid_inc_ship", DataType::Decimal128(7, 2), false), - Field::new( - "cs_net_paid_inc_ship_tax", - DataType::Decimal128(7, 2), - false, - ), - Field::new("cs_net_profit", DataType::Decimal128(7, 2), false), - ]), - ), - TableDef::new( - "catalog_returns", - Schema::new(vec![ - Field::new("cr_returned_date_sk", DataType::Int32, false), - Field::new("cr_returned_time_sk", DataType::Int32, false), - Field::new("cr_item_sk", DataType::Int32, false), - Field::new("cr_refunded_customer_sk", DataType::Int32, false), - Field::new("cr_refunded_cdemo_sk", DataType::Int32, false), - Field::new("cr_refunded_hdemo_sk", DataType::Int32, false), - Field::new("cr_refunded_addr_sk", DataType::Int32, false), - Field::new("cr_returning_customer_sk", DataType::Int32, false), - Field::new("cr_returning_cdemo_sk", DataType::Int32, false), - Field::new("cr_returning_hdemo_sk", DataType::Int32, false), - Field::new("cr_returning_addr_sk", DataType::Int32, false), - Field::new("cr_call_center_sk", DataType::Int32, false), - Field::new("cr_catalog_page_sk", DataType::Int32, false), - Field::new("cr_ship_mode_sk", DataType::Int32, false), - Field::new("cr_warehouse_sk", DataType::Int32, false), - Field::new("cr_reason_sk", DataType::Int32, false), - Field::new("cr_order_number", DataType::Int64, false), - Field::new("cr_return_quantity", DataType::Int32, false), - Field::new("cr_return_amount", DataType::Decimal128(7, 2), false), - Field::new("cr_return_tax", DataType::Decimal128(7, 2), false), - Field::new("cr_return_amt_inc_tax", DataType::Decimal128(7, 2), false), - Field::new("cr_fee", DataType::Decimal128(7, 2), false), - Field::new("cr_return_ship_cost", DataType::Decimal128(7, 2), false), - Field::new("cr_refunded_cash", DataType::Decimal128(7, 2), false), - Field::new("cr_reversed_charge", DataType::Decimal128(7, 2), false), - Field::new("cr_store_credit", DataType::Decimal128(7, 2), false), - Field::new("cr_net_loss", DataType::Decimal128(7, 2), false), - ]), - ), - TableDef::new( - "inventory", - Schema::new(vec![ - Field::new("inv_date_sk", DataType::Int32, false), - Field::new("inv_item_sk", DataType::Int32, false), - Field::new("inv_warehouse_sk", DataType::Int32, false), - Field::new("inv_quantity_on_hand", DataType::Int32, false), - ]), - ), - TableDef::new( - "store_sales", - Schema::new(vec![ - Field::new("ss_sold_date_sk", DataType::Int32, false), - Field::new("ss_sold_time_sk", DataType::Int32, false), - Field::new("ss_item_sk", DataType::Int32, false), - Field::new("ss_customer_sk", DataType::Int32, false), - Field::new("ss_cdemo_sk", DataType::Int32, false), - Field::new("ss_hdemo_sk", DataType::Int32, false), - Field::new("ss_addr_sk", DataType::Int32, false), - Field::new("ss_store_sk", DataType::Int32, false), - Field::new("ss_promo_sk", DataType::Int32, false), - Field::new("ss_ticket_number", DataType::Int64, false), - Field::new("ss_quantity", DataType::Int32, false), - Field::new("ss_wholesale_cost", DataType::Decimal128(7, 2), false), - Field::new("ss_list_price", DataType::Decimal128(7, 2), false), - Field::new("ss_sales_price", DataType::Decimal128(7, 2), false), - Field::new("ss_ext_discount_amt", DataType::Decimal128(7, 2), false), - Field::new("ss_ext_sales_price", DataType::Decimal128(7, 2), false), - Field::new("ss_ext_wholesale_cost", DataType::Decimal128(7, 2), false), - Field::new("ss_ext_list_price", DataType::Decimal128(7, 2), false), - Field::new("ss_ext_tax", DataType::Decimal128(7, 2), false), - Field::new("ss_coupon_amt", DataType::Decimal128(7, 2), false), - Field::new("ss_net_paid", DataType::Decimal128(7, 2), false), - Field::new("ss_net_paid_inc_tax", DataType::Decimal128(7, 2), false), - Field::new("ss_net_profit", DataType::Decimal128(7, 2), false), - ]), - ), - TableDef::new( - "store_returns", - Schema::new(vec![ - Field::new("sr_returned_date_sk", DataType::Int32, false), - Field::new("sr_return_time_sk", DataType::Int32, false), - Field::new("sr_item_sk", DataType::Int32, false), - Field::new("sr_customer_sk", DataType::Int32, false), - Field::new("sr_cdemo_sk", DataType::Int32, false), - Field::new("sr_hdemo_sk", DataType::Int32, false), - Field::new("sr_addr_sk", DataType::Int32, false), - Field::new("sr_store_sk", DataType::Int32, false), - Field::new("sr_reason_sk", DataType::Int32, false), - Field::new("sr_ticket_number", DataType::Int64, false), - Field::new("sr_return_quantity", DataType::Int32, false), - Field::new("sr_return_amt", DataType::Decimal128(7, 2), false), - Field::new("sr_return_tax", DataType::Decimal128(7, 2), false), - Field::new("sr_return_amt_inc_tax", DataType::Decimal128(7, 2), false), - Field::new("sr_fee", DataType::Decimal128(7, 2), false), - Field::new("sr_return_ship_cost", DataType::Decimal128(7, 2), false), - Field::new("sr_refunded_cash", DataType::Decimal128(7, 2), false), - Field::new("sr_reversed_charge", DataType::Decimal128(7, 2), false), - Field::new("sr_store_credit", DataType::Decimal128(7, 2), false), - Field::new("sr_net_loss", DataType::Decimal128(7, 2), false), - ]), - ), - TableDef::new( - "web_sales", - Schema::new(vec![ - Field::new("ws_sold_date_sk", DataType::Int32, false), - Field::new("ws_sold_time_sk", DataType::Int32, false), - Field::new("ws_ship_date_sk", DataType::Int32, false), - Field::new("ws_item_sk", DataType::Int32, false), - Field::new("ws_bill_customer_sk", DataType::Int32, false), - Field::new("ws_bill_cdemo_sk", DataType::Int32, false), - Field::new("ws_bill_hdemo_sk", DataType::Int32, false), - Field::new("ws_bill_addr_sk", DataType::Int32, false), - Field::new("ws_ship_customer_sk", DataType::Int32, false), - Field::new("ws_ship_cdemo_sk", DataType::Int32, false), - Field::new("ws_ship_hdemo_sk", DataType::Int32, false), - Field::new("ws_ship_addr_sk", DataType::Int32, false), - Field::new("ws_web_page_sk", DataType::Int32, false), - Field::new("ws_web_site_sk", DataType::Int32, false), - Field::new("ws_ship_mode_sk", DataType::Int32, false), - Field::new("ws_warehouse_sk", DataType::Int32, false), - Field::new("ws_promo_sk", DataType::Int32, false), - Field::new("ws_order_number", DataType::Int64, false), - Field::new("ws_quantity", DataType::Int32, false), - Field::new("ws_wholesale_cost", DataType::Decimal128(7, 2), false), - Field::new("ws_list_price", DataType::Decimal128(7, 2), false), - Field::new("ws_sales_price", DataType::Decimal128(7, 2), false), - Field::new("ws_ext_discount_amt", DataType::Decimal128(7, 2), false), - Field::new("ws_ext_sales_price", DataType::Decimal128(7, 2), false), - Field::new("ws_ext_wholesale_cost", DataType::Decimal128(7, 2), false), - Field::new("ws_ext_list_price", DataType::Decimal128(7, 2), false), - Field::new("ws_ext_tax", DataType::Decimal128(7, 2), false), - Field::new("ws_coupon_amt", DataType::Decimal128(7, 2), false), - Field::new("ws_ext_ship_cost", DataType::Decimal128(7, 2), false), - Field::new("ws_net_paid", DataType::Decimal128(7, 2), false), - Field::new("ws_net_paid_inc_tax", DataType::Decimal128(7, 2), false), - Field::new("ws_net_paid_inc_ship", DataType::Decimal128(7, 2), false), - Field::new( - "ws_net_paid_inc_ship_tax", - DataType::Decimal128(7, 2), - false, - ), - Field::new("ws_net_profit", DataType::Decimal128(7, 2), false), - ]), - ), - TableDef::new( - "web_returns", - Schema::new(vec![ - Field::new("wr_returned_date_sk", DataType::Int32, false), - Field::new("wr_returned_time_sk", DataType::Int32, false), - Field::new("wr_item_sk", DataType::Int32, false), - Field::new("wr_refunded_customer_sk", DataType::Int32, false), - Field::new("wr_refunded_cdemo_sk", DataType::Int32, false), - Field::new("wr_refunded_hdemo_sk", DataType::Int32, false), - Field::new("wr_refunded_addr_sk", DataType::Int32, false), - Field::new("wr_returning_customer_sk", DataType::Int32, false), - Field::new("wr_returning_cdemo_sk", DataType::Int32, false), - Field::new("wr_returning_hdemo_sk", DataType::Int32, false), - Field::new("wr_returning_addr_sk", DataType::Int32, false), - Field::new("wr_web_page_sk", DataType::Int32, false), - Field::new("wr_reason_sk", DataType::Int32, false), - Field::new("wr_order_number", DataType::Int64, false), - Field::new("wr_return_quantity", DataType::Int32, false), - Field::new("wr_return_amt", DataType::Decimal128(7, 2), false), - Field::new("wr_return_tax", DataType::Decimal128(7, 2), false), - Field::new("wr_return_amt_inc_tax", DataType::Decimal128(7, 2), false), - Field::new("wr_fee", DataType::Decimal128(7, 2), false), - Field::new("wr_return_ship_cost", DataType::Decimal128(7, 2), false), - Field::new("wr_refunded_cash", DataType::Decimal128(7, 2), false), - Field::new("wr_reversed_charge", DataType::Decimal128(7, 2), false), - Field::new("wr_account_credit", DataType::Decimal128(7, 2), false), - Field::new("wr_net_loss", DataType::Decimal128(7, 2), false), - ]), - ), - TableDef::new( - "call_center", - Schema::new(vec![ - Field::new("cc_call_center_sk", DataType::Int32, false), - Field::new("cc_call_center_id", DataType::Utf8, false), - Field::new("cc_rec_start_date", DataType::Date32, false), - Field::new("cc_rec_end_date", DataType::Date32, false), - Field::new("cc_closed_date_sk", DataType::Int32, false), - Field::new("cc_open_date_sk", DataType::Int32, false), - Field::new("cc_name", DataType::Utf8, false), - Field::new("cc_class", DataType::Utf8, false), - Field::new("cc_employees", DataType::Int32, false), - Field::new("cc_sq_ft", DataType::Int32, false), - Field::new("cc_hours", DataType::Utf8, false), - Field::new("cc_manager", DataType::Utf8, false), - Field::new("cc_mkt_id", DataType::Int32, false), - Field::new("cc_mkt_class", DataType::Utf8, false), - Field::new("cc_mkt_desc", DataType::Utf8, false), - Field::new("cc_market_manager", DataType::Utf8, false), - Field::new("cc_division", DataType::Int32, false), - Field::new("cc_division_name", DataType::Utf8, false), - Field::new("cc_company", DataType::Int32, false), - Field::new("cc_company_name", DataType::Utf8, false), - Field::new("cc_street_number", DataType::Utf8, false), - Field::new("cc_street_name", DataType::Utf8, false), - Field::new("cc_street_type", DataType::Utf8, false), - Field::new("cc_suite_number", DataType::Utf8, false), - Field::new("cc_city", DataType::Utf8, false), - Field::new("cc_county", DataType::Utf8, false), - Field::new("cc_state", DataType::Utf8, false), - Field::new("cc_zip", DataType::Utf8, false), - Field::new("cc_country", DataType::Utf8, false), - Field::new("cc_gmt_offset", DataType::Decimal128(5, 2), false), - Field::new("cc_tax_percentage", DataType::Decimal128(5, 2), false), - ]), - ), - TableDef::new( - "catalog_page", - Schema::new(vec![ - Field::new("cp_catalog_page_sk", DataType::Int32, false), - Field::new("cp_catalog_page_id", DataType::Utf8, false), - Field::new("cp_start_date_sk", DataType::Int32, false), - Field::new("cp_end_date_sk", DataType::Int32, false), - Field::new("cp_department", DataType::Utf8, false), - Field::new("cp_catalog_number", DataType::Int32, false), - Field::new("cp_catalog_page_number", DataType::Int32, false), - Field::new("cp_description", DataType::Utf8, false), - Field::new("cp_type", DataType::Utf8, false), - ]), - ), - TableDef::new( - "customer", - Schema::new(vec![ - Field::new("c_customer_sk", DataType::Int32, false), - Field::new("c_customer_id", DataType::Utf8, false), - Field::new("c_current_cdemo_sk", DataType::Int32, false), - Field::new("c_current_hdemo_sk", DataType::Int32, false), - Field::new("c_current_addr_sk", DataType::Int32, false), - Field::new("c_first_shipto_date_sk", DataType::Int32, false), - Field::new("c_first_sales_date_sk", DataType::Int32, false), - Field::new("c_salutation", DataType::Utf8, false), - Field::new("c_first_name", DataType::Utf8, false), - Field::new("c_last_name", DataType::Utf8, false), - Field::new("c_preferred_cust_flag", DataType::Utf8, false), - Field::new("c_birth_day", DataType::Int32, false), - Field::new("c_birth_month", DataType::Int32, false), - Field::new("c_birth_year", DataType::Int32, false), - Field::new("c_birth_country", DataType::Utf8, false), - Field::new("c_login", DataType::Utf8, false), - Field::new("c_email_address", DataType::Utf8, false), - Field::new("c_last_review_date_sk", DataType::Utf8, false), - ]), - ), - TableDef::new( - "customer_address", - Schema::new(vec![ - Field::new("ca_address_sk", DataType::Int32, false), - Field::new("ca_address_id", DataType::Utf8, false), - Field::new("ca_street_number", DataType::Utf8, false), - Field::new("ca_street_name", DataType::Utf8, false), - Field::new("ca_street_type", DataType::Utf8, false), - Field::new("ca_suite_number", DataType::Utf8, false), - Field::new("ca_city", DataType::Utf8, false), - Field::new("ca_county", DataType::Utf8, false), - Field::new("ca_state", DataType::Utf8, false), - Field::new("ca_zip", DataType::Utf8, false), - Field::new("ca_country", DataType::Utf8, false), - Field::new("ca_gmt_offset", DataType::Decimal128(5, 2), false), - Field::new("ca_location_type", DataType::Utf8, false), - ]), - ), - TableDef::new( - "customer_demographics", - Schema::new(vec![ - Field::new("cd_demo_sk", DataType::Int32, false), - Field::new("cd_gender", DataType::Utf8, false), - Field::new("cd_marital_status", DataType::Utf8, false), - Field::new("cd_education_status", DataType::Utf8, false), - Field::new("cd_purchase_estimate", DataType::Int32, false), - Field::new("cd_credit_rating", DataType::Utf8, false), - Field::new("cd_dep_count", DataType::Int32, false), - Field::new("cd_dep_employed_count", DataType::Int32, false), - Field::new("cd_dep_college_count", DataType::Int32, false), - ]), - ), - TableDef::new( - "date_dim", - Schema::new(vec![ - Field::new("d_date_sk", DataType::Int32, false), - Field::new("d_date_id", DataType::Utf8, false), - Field::new("d_date", DataType::Date32, false), - Field::new("d_month_seq", DataType::Int32, false), - Field::new("d_week_seq", DataType::Int32, false), - Field::new("d_quarter_seq", DataType::Int32, false), - Field::new("d_year", DataType::Int32, false), - Field::new("d_dow", DataType::Int32, false), - Field::new("d_moy", DataType::Int32, false), - Field::new("d_dom", DataType::Int32, false), - Field::new("d_qoy", DataType::Int32, false), - Field::new("d_fy_year", DataType::Int32, false), - Field::new("d_fy_quarter_seq", DataType::Int32, false), - Field::new("d_fy_week_seq", DataType::Int32, false), - Field::new("d_day_name", DataType::Utf8, false), - Field::new("d_quarter_name", DataType::Utf8, false), - Field::new("d_holiday", DataType::Utf8, false), - Field::new("d_weekend", DataType::Utf8, false), - Field::new("d_following_holiday", DataType::Utf8, false), - Field::new("d_first_dom", DataType::Int32, false), - Field::new("d_last_dom", DataType::Int32, false), - Field::new("d_same_day_ly", DataType::Int32, false), - Field::new("d_same_day_lq", DataType::Int32, false), - Field::new("d_current_day", DataType::Utf8, false), - Field::new("d_current_week", DataType::Utf8, false), - Field::new("d_current_month", DataType::Utf8, false), - Field::new("d_current_quarter", DataType::Utf8, false), - Field::new("d_current_year", DataType::Utf8, false), - ]), - ), - TableDef::new( - "household_demographics", - Schema::new(vec![ - Field::new("hd_demo_sk", DataType::Int32, false), - Field::new("hd_income_band_sk", DataType::Int32, false), - Field::new("hd_buy_potential", DataType::Utf8, false), - Field::new("hd_dep_count", DataType::Int32, false), - Field::new("hd_vehicle_count", DataType::Int32, false), - ]), - ), - TableDef::new( - "income_band", - Schema::new(vec![ - Field::new("ib_income_band_sk", DataType::Int32, false), - Field::new("ib_lower_bound", DataType::Int32, false), - Field::new("ib_upper_bound", DataType::Int32, false), - ]), - ), - TableDef::new( - "item", - Schema::new(vec![ - Field::new("i_item_sk", DataType::Int32, false), - Field::new("i_item_id", DataType::Utf8, false), - Field::new("i_rec_start_date", DataType::Date32, false), - Field::new("i_rec_end_date", DataType::Date32, false), - Field::new("i_item_desc", DataType::Utf8, false), - Field::new("i_current_price", DataType::Decimal128(7, 2), false), - Field::new("i_wholesale_cost", DataType::Decimal128(7, 2), false), - Field::new("i_brand_id", DataType::Int32, false), - Field::new("i_brand", DataType::Utf8, false), - Field::new("i_class_id", DataType::Int32, false), - Field::new("i_class", DataType::Utf8, false), - Field::new("i_category_id", DataType::Int32, false), - Field::new("i_category", DataType::Utf8, false), - Field::new("i_manufact_id", DataType::Int32, false), - Field::new("i_manufact", DataType::Utf8, false), - Field::new("i_size", DataType::Utf8, false), - Field::new("i_formulation", DataType::Utf8, false), - Field::new("i_color", DataType::Utf8, false), - Field::new("i_units", DataType::Utf8, false), - Field::new("i_container", DataType::Utf8, false), - Field::new("i_manager_id", DataType::Int32, false), - Field::new("i_product_name", DataType::Utf8, false), - ]), - ), - TableDef::new( - "promotion", - Schema::new(vec![ - Field::new("p_promo_sk", DataType::Int32, false), - Field::new("p_promo_id", DataType::Utf8, false), - Field::new("p_start_date_sk", DataType::Int32, false), - Field::new("p_end_date_sk", DataType::Int32, false), - Field::new("p_item_sk", DataType::Int32, false), - Field::new("p_cost", DataType::Decimal128(15, 2), false), - Field::new("p_response_target", DataType::Int32, false), - Field::new("p_promo_name", DataType::Utf8, false), - Field::new("p_channel_dmail", DataType::Utf8, false), - Field::new("p_channel_email", DataType::Utf8, false), - Field::new("p_channel_catalog", DataType::Utf8, false), - Field::new("p_channel_tv", DataType::Utf8, false), - Field::new("p_channel_radio", DataType::Utf8, false), - Field::new("p_channel_press", DataType::Utf8, false), - Field::new("p_channel_event", DataType::Utf8, false), - Field::new("p_channel_demo", DataType::Utf8, false), - Field::new("p_channel_details", DataType::Utf8, false), - Field::new("p_purpose", DataType::Utf8, false), - Field::new("p_discount_active", DataType::Utf8, false), - ]), - ), - TableDef::new( - "reason", - Schema::new(vec![ - Field::new("r_reason_sk", DataType::Int32, false), - Field::new("r_reason_id", DataType::Utf8, false), - Field::new("r_reason_desc", DataType::Utf8, false), - ]), - ), - TableDef::new( - "ship_mode", - //), - Schema::new(vec![ - Field::new("sm_ship_mode_sk", DataType::Int32, false), - Field::new("sm_ship_mode_id", DataType::Utf8, false), - Field::new("sm_type", DataType::Utf8, false), - Field::new("sm_code", DataType::Utf8, false), - Field::new("sm_carrier", DataType::Utf8, false), - Field::new("sm_contract", DataType::Utf8, false), - ]), - ), - TableDef::new( - "store", - Schema::new(vec![ - Field::new("s_store_sk", DataType::Int32, false), - Field::new("s_store_id", DataType::Utf8, false), - Field::new("s_rec_start_date", DataType::Date32, false), - Field::new("s_rec_end_date", DataType::Date32, false), - Field::new("s_closed_date_sk", DataType::Int32, false), - Field::new("s_store_name", DataType::Utf8, false), - Field::new("s_number_employees", DataType::Int32, false), - Field::new("s_floor_space", DataType::Int32, false), - Field::new("s_hours", DataType::Utf8, false), - Field::new("s_manager", DataType::Utf8, false), - Field::new("s_market_id", DataType::Int32, false), - Field::new("s_geography_class", DataType::Utf8, false), - Field::new("s_market_desc", DataType::Utf8, false), - Field::new("s_market_manager", DataType::Utf8, false), - Field::new("s_division_id", DataType::Int32, false), - Field::new("s_division_name", DataType::Utf8, false), - Field::new("s_company_id", DataType::Int32, false), - Field::new("s_company_name", DataType::Utf8, false), - Field::new("s_street_number", DataType::Utf8, false), - Field::new("s_street_name", DataType::Utf8, false), - Field::new("s_street_type", DataType::Utf8, false), - Field::new("s_suite_number", DataType::Utf8, false), - Field::new("s_city", DataType::Utf8, false), - Field::new("s_county", DataType::Utf8, false), - Field::new("s_state", DataType::Utf8, false), - Field::new("s_zip", DataType::Utf8, false), - Field::new("s_country", DataType::Utf8, false), - Field::new("s_gmt_offset", DataType::Decimal128(5, 2), false), - Field::new("s_tax_precentage", DataType::Decimal128(5, 2), false), - ]), - ), - TableDef::new( - "time_dim", - Schema::new(vec![ - Field::new("t_time_sk", DataType::Int32, false), - Field::new("t_time_id", DataType::Utf8, false), - Field::new("t_time", DataType::Int32, false), - Field::new("t_hour", DataType::Int32, false), - Field::new("t_minute", DataType::Int32, false), - Field::new("t_second", DataType::Int32, false), - Field::new("t_am_pm", DataType::Utf8, false), - Field::new("t_shift", DataType::Utf8, false), - Field::new("t_sub_shift", DataType::Utf8, false), - Field::new("t_meal_time", DataType::Utf8, false), - ]), - ), - TableDef::new( - "warehouse", - //), - Schema::new(vec![ - Field::new("w_warehouse_sk", DataType::Int32, false), - Field::new("w_warehouse_id", DataType::Utf8, false), - Field::new("w_warehouse_name", DataType::Utf8, false), - Field::new("w_warehouse_sq_ft", DataType::Int32, false), - Field::new("w_street_number", DataType::Utf8, false), - Field::new("w_street_name", DataType::Utf8, false), - Field::new("w_street_type", DataType::Utf8, false), - Field::new("w_suite_number", DataType::Utf8, false), - Field::new("w_city", DataType::Utf8, false), - Field::new("w_county", DataType::Utf8, false), - Field::new("w_state", DataType::Utf8, false), - Field::new("w_zip", DataType::Utf8, false), - Field::new("w_country", DataType::Utf8, false), - Field::new("w_gmt_offset", DataType::Decimal128(5, 2), false), - ]), - ), - TableDef::new( - "web_page", - Schema::new(vec![ - Field::new("wp_web_page_sk", DataType::Int32, false), - Field::new("wp_web_page_id", DataType::Utf8, false), - Field::new("wp_rec_start_date", DataType::Date32, false), - Field::new("wp_rec_end_date", DataType::Date32, false), - Field::new("wp_creation_date_sk", DataType::Int32, false), - Field::new("wp_access_date_sk", DataType::Int32, false), - Field::new("wp_autogen_flag", DataType::Utf8, false), - Field::new("wp_customer_sk", DataType::Int32, false), - Field::new("wp_url", DataType::Utf8, false), - Field::new("wp_type", DataType::Utf8, false), - Field::new("wp_char_count", DataType::Int32, false), - Field::new("wp_link_count", DataType::Int32, false), - Field::new("wp_image_count", DataType::Int32, false), - Field::new("wp_max_ad_count", DataType::Int32, false), - ]), - ), - TableDef::new( - "web_site", - Schema::new(vec![ - Field::new("web_site_sk", DataType::Int32, false), - Field::new("web_site_id", DataType::Utf8, false), - Field::new("web_rec_start_date", DataType::Date32, false), - Field::new("web_rec_end_date", DataType::Date32, false), - Field::new("web_name", DataType::Utf8, false), - Field::new("web_open_date_sk", DataType::Int32, false), - Field::new("web_close_date_sk", DataType::Int32, false), - Field::new("web_class", DataType::Utf8, false), - Field::new("web_manager", DataType::Utf8, false), - Field::new("web_mkt_id", DataType::Int32, false), - Field::new("web_mkt_class", DataType::Utf8, false), - Field::new("web_mkt_desc", DataType::Utf8, false), - Field::new("web_market_manager", DataType::Utf8, false), - Field::new("web_company_id", DataType::Int32, false), - Field::new("web_company_name", DataType::Utf8, false), - Field::new("web_street_number", DataType::Utf8, false), - Field::new("web_street_name", DataType::Utf8, false), - Field::new("web_street_type", DataType::Utf8, false), - Field::new("web_suite_number", DataType::Utf8, false), - Field::new("web_city", DataType::Utf8, false), - Field::new("web_county", DataType::Utf8, false), - Field::new("web_state", DataType::Utf8, false), - Field::new("web_zip", DataType::Utf8, false), - Field::new("web_country", DataType::Utf8, false), - Field::new("web_gmt_offset", DataType::Decimal128(5, 2), false), - Field::new("web_tax_percentage", DataType::Decimal128(5, 2), false), - ]), - ), - ] -} diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 777a24470232..3ddba2fec800 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -16,6 +16,7 @@ // under the License. //! Common functions used for testing +use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_int32_array; use rand::prelude::StdRng; @@ -23,6 +24,8 @@ use rand::{Rng, SeedableRng}; mod data_gen; mod string_gen; +pub mod tpcds; +pub mod tpch; pub use data_gen::AccessLogGenerator; pub use string_gen::StringBatchGenerator; @@ -105,3 +108,18 @@ pub fn stagger_batch_with_seed(batch: RecordBatch, seed: u64) -> Vec, schema: Schema) -> Self { + Self { + name: name.into(), + schema, + } + } +} diff --git a/test-utils/src/tpcds.rs b/test-utils/src/tpcds.rs new file mode 100644 index 000000000000..ce5bac5bfd83 --- /dev/null +++ b/test-utils/src/tpcds.rs @@ -0,0 +1,579 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::TableDef; +use arrow::datatypes::{DataType, Field, Schema}; + +pub fn tpcds_schemas() -> Vec { + vec![ + TableDef::new( + "catalog_sales", + Schema::new(vec![ + Field::new("cs_sold_date_sk", DataType::Int32, false), + Field::new("cs_sold_time_sk", DataType::Int32, false), + Field::new("cs_ship_date_sk", DataType::Int32, false), + Field::new("cs_bill_customer_sk", DataType::Int32, false), + Field::new("cs_bill_cdemo_sk", DataType::Int32, false), + Field::new("cs_bill_hdemo_sk", DataType::Int32, false), + Field::new("cs_bill_addr_sk", DataType::Int32, false), + Field::new("cs_ship_customer_sk", DataType::Int32, false), + Field::new("cs_ship_cdemo_sk", DataType::Int32, false), + Field::new("cs_ship_hdemo_sk", DataType::Int32, false), + Field::new("cs_ship_addr_sk", DataType::Int32, false), + Field::new("cs_call_center_sk", DataType::Int32, false), + Field::new("cs_catalog_page_sk", DataType::Int32, false), + Field::new("cs_ship_mode_sk", DataType::Int32, false), + Field::new("cs_warehouse_sk", DataType::Int32, false), + Field::new("cs_item_sk", DataType::Int32, false), + Field::new("cs_promo_sk", DataType::Int32, false), + Field::new("cs_order_number", DataType::Int64, false), + Field::new("cs_quantity", DataType::Int32, false), + Field::new("cs_wholesale_cost", DataType::Decimal128(7, 2), false), + Field::new("cs_list_price", DataType::Decimal128(7, 2), false), + Field::new("cs_sales_price", DataType::Decimal128(7, 2), false), + Field::new("cs_ext_discount_amt", DataType::Decimal128(7, 2), false), + Field::new("cs_ext_sales_price", DataType::Decimal128(7, 2), false), + Field::new("cs_ext_wholesale_cost", DataType::Decimal128(7, 2), false), + Field::new("cs_ext_list_price", DataType::Decimal128(7, 2), false), + Field::new("cs_ext_tax", DataType::Decimal128(7, 2), false), + Field::new("cs_coupon_amt", DataType::Decimal128(7, 2), false), + Field::new("cs_ext_ship_cost", DataType::Decimal128(7, 2), false), + Field::new("cs_net_paid", DataType::Decimal128(7, 2), false), + Field::new("cs_net_paid_inc_tax", DataType::Decimal128(7, 2), false), + Field::new("cs_net_paid_inc_ship", DataType::Decimal128(7, 2), false), + Field::new( + "cs_net_paid_inc_ship_tax", + DataType::Decimal128(7, 2), + false, + ), + Field::new("cs_net_profit", DataType::Decimal128(7, 2), false), + ]), + ), + TableDef::new( + "catalog_returns", + Schema::new(vec![ + Field::new("cr_returned_date_sk", DataType::Int32, false), + Field::new("cr_returned_time_sk", DataType::Int32, false), + Field::new("cr_item_sk", DataType::Int32, false), + Field::new("cr_refunded_customer_sk", DataType::Int32, false), + Field::new("cr_refunded_cdemo_sk", DataType::Int32, false), + Field::new("cr_refunded_hdemo_sk", DataType::Int32, false), + Field::new("cr_refunded_addr_sk", DataType::Int32, false), + Field::new("cr_returning_customer_sk", DataType::Int32, false), + Field::new("cr_returning_cdemo_sk", DataType::Int32, false), + Field::new("cr_returning_hdemo_sk", DataType::Int32, false), + Field::new("cr_returning_addr_sk", DataType::Int32, false), + Field::new("cr_call_center_sk", DataType::Int32, false), + Field::new("cr_catalog_page_sk", DataType::Int32, false), + Field::new("cr_ship_mode_sk", DataType::Int32, false), + Field::new("cr_warehouse_sk", DataType::Int32, false), + Field::new("cr_reason_sk", DataType::Int32, false), + Field::new("cr_order_number", DataType::Int64, false), + Field::new("cr_return_quantity", DataType::Int32, false), + Field::new("cr_return_amount", DataType::Decimal128(7, 2), false), + Field::new("cr_return_tax", DataType::Decimal128(7, 2), false), + Field::new("cr_return_amt_inc_tax", DataType::Decimal128(7, 2), false), + Field::new("cr_fee", DataType::Decimal128(7, 2), false), + Field::new("cr_return_ship_cost", DataType::Decimal128(7, 2), false), + Field::new("cr_refunded_cash", DataType::Decimal128(7, 2), false), + Field::new("cr_reversed_charge", DataType::Decimal128(7, 2), false), + Field::new("cr_store_credit", DataType::Decimal128(7, 2), false), + Field::new("cr_net_loss", DataType::Decimal128(7, 2), false), + ]), + ), + TableDef::new( + "inventory", + Schema::new(vec![ + Field::new("inv_date_sk", DataType::Int32, false), + Field::new("inv_item_sk", DataType::Int32, false), + Field::new("inv_warehouse_sk", DataType::Int32, false), + Field::new("inv_quantity_on_hand", DataType::Int32, false), + ]), + ), + TableDef::new( + "store_sales", + Schema::new(vec![ + Field::new("ss_sold_date_sk", DataType::Int32, false), + Field::new("ss_sold_time_sk", DataType::Int32, false), + Field::new("ss_item_sk", DataType::Int32, false), + Field::new("ss_customer_sk", DataType::Int32, false), + Field::new("ss_cdemo_sk", DataType::Int32, false), + Field::new("ss_hdemo_sk", DataType::Int32, false), + Field::new("ss_addr_sk", DataType::Int32, false), + Field::new("ss_store_sk", DataType::Int32, false), + Field::new("ss_promo_sk", DataType::Int32, false), + Field::new("ss_ticket_number", DataType::Int64, false), + Field::new("ss_quantity", DataType::Int32, false), + Field::new("ss_wholesale_cost", DataType::Decimal128(7, 2), false), + Field::new("ss_list_price", DataType::Decimal128(7, 2), false), + Field::new("ss_sales_price", DataType::Decimal128(7, 2), false), + Field::new("ss_ext_discount_amt", DataType::Decimal128(7, 2), false), + Field::new("ss_ext_sales_price", DataType::Decimal128(7, 2), false), + Field::new("ss_ext_wholesale_cost", DataType::Decimal128(7, 2), false), + Field::new("ss_ext_list_price", DataType::Decimal128(7, 2), false), + Field::new("ss_ext_tax", DataType::Decimal128(7, 2), false), + Field::new("ss_coupon_amt", DataType::Decimal128(7, 2), false), + Field::new("ss_net_paid", DataType::Decimal128(7, 2), false), + Field::new("ss_net_paid_inc_tax", DataType::Decimal128(7, 2), false), + Field::new("ss_net_profit", DataType::Decimal128(7, 2), false), + ]), + ), + TableDef::new( + "store_returns", + Schema::new(vec![ + Field::new("sr_returned_date_sk", DataType::Int32, false), + Field::new("sr_return_time_sk", DataType::Int32, false), + Field::new("sr_item_sk", DataType::Int32, false), + Field::new("sr_customer_sk", DataType::Int32, false), + Field::new("sr_cdemo_sk", DataType::Int32, false), + Field::new("sr_hdemo_sk", DataType::Int32, false), + Field::new("sr_addr_sk", DataType::Int32, false), + Field::new("sr_store_sk", DataType::Int32, false), + Field::new("sr_reason_sk", DataType::Int32, false), + Field::new("sr_ticket_number", DataType::Int64, false), + Field::new("sr_return_quantity", DataType::Int32, false), + Field::new("sr_return_amt", DataType::Decimal128(7, 2), false), + Field::new("sr_return_tax", DataType::Decimal128(7, 2), false), + Field::new("sr_return_amt_inc_tax", DataType::Decimal128(7, 2), false), + Field::new("sr_fee", DataType::Decimal128(7, 2), false), + Field::new("sr_return_ship_cost", DataType::Decimal128(7, 2), false), + Field::new("sr_refunded_cash", DataType::Decimal128(7, 2), false), + Field::new("sr_reversed_charge", DataType::Decimal128(7, 2), false), + Field::new("sr_store_credit", DataType::Decimal128(7, 2), false), + Field::new("sr_net_loss", DataType::Decimal128(7, 2), false), + ]), + ), + TableDef::new( + "web_sales", + Schema::new(vec![ + Field::new("ws_sold_date_sk", DataType::Int32, false), + Field::new("ws_sold_time_sk", DataType::Int32, false), + Field::new("ws_ship_date_sk", DataType::Int32, false), + Field::new("ws_item_sk", DataType::Int32, false), + Field::new("ws_bill_customer_sk", DataType::Int32, false), + Field::new("ws_bill_cdemo_sk", DataType::Int32, false), + Field::new("ws_bill_hdemo_sk", DataType::Int32, false), + Field::new("ws_bill_addr_sk", DataType::Int32, false), + Field::new("ws_ship_customer_sk", DataType::Int32, false), + Field::new("ws_ship_cdemo_sk", DataType::Int32, false), + Field::new("ws_ship_hdemo_sk", DataType::Int32, false), + Field::new("ws_ship_addr_sk", DataType::Int32, false), + Field::new("ws_web_page_sk", DataType::Int32, false), + Field::new("ws_web_site_sk", DataType::Int32, false), + Field::new("ws_ship_mode_sk", DataType::Int32, false), + Field::new("ws_warehouse_sk", DataType::Int32, false), + Field::new("ws_promo_sk", DataType::Int32, false), + Field::new("ws_order_number", DataType::Int64, false), + Field::new("ws_quantity", DataType::Int32, false), + Field::new("ws_wholesale_cost", DataType::Decimal128(7, 2), false), + Field::new("ws_list_price", DataType::Decimal128(7, 2), false), + Field::new("ws_sales_price", DataType::Decimal128(7, 2), false), + Field::new("ws_ext_discount_amt", DataType::Decimal128(7, 2), false), + Field::new("ws_ext_sales_price", DataType::Decimal128(7, 2), false), + Field::new("ws_ext_wholesale_cost", DataType::Decimal128(7, 2), false), + Field::new("ws_ext_list_price", DataType::Decimal128(7, 2), false), + Field::new("ws_ext_tax", DataType::Decimal128(7, 2), false), + Field::new("ws_coupon_amt", DataType::Decimal128(7, 2), false), + Field::new("ws_ext_ship_cost", DataType::Decimal128(7, 2), false), + Field::new("ws_net_paid", DataType::Decimal128(7, 2), false), + Field::new("ws_net_paid_inc_tax", DataType::Decimal128(7, 2), false), + Field::new("ws_net_paid_inc_ship", DataType::Decimal128(7, 2), false), + Field::new( + "ws_net_paid_inc_ship_tax", + DataType::Decimal128(7, 2), + false, + ), + Field::new("ws_net_profit", DataType::Decimal128(7, 2), false), + ]), + ), + TableDef::new( + "web_returns", + Schema::new(vec![ + Field::new("wr_returned_date_sk", DataType::Int32, false), + Field::new("wr_returned_time_sk", DataType::Int32, false), + Field::new("wr_item_sk", DataType::Int32, false), + Field::new("wr_refunded_customer_sk", DataType::Int32, false), + Field::new("wr_refunded_cdemo_sk", DataType::Int32, false), + Field::new("wr_refunded_hdemo_sk", DataType::Int32, false), + Field::new("wr_refunded_addr_sk", DataType::Int32, false), + Field::new("wr_returning_customer_sk", DataType::Int32, false), + Field::new("wr_returning_cdemo_sk", DataType::Int32, false), + Field::new("wr_returning_hdemo_sk", DataType::Int32, false), + Field::new("wr_returning_addr_sk", DataType::Int32, false), + Field::new("wr_web_page_sk", DataType::Int32, false), + Field::new("wr_reason_sk", DataType::Int32, false), + Field::new("wr_order_number", DataType::Int64, false), + Field::new("wr_return_quantity", DataType::Int32, false), + Field::new("wr_return_amt", DataType::Decimal128(7, 2), false), + Field::new("wr_return_tax", DataType::Decimal128(7, 2), false), + Field::new("wr_return_amt_inc_tax", DataType::Decimal128(7, 2), false), + Field::new("wr_fee", DataType::Decimal128(7, 2), false), + Field::new("wr_return_ship_cost", DataType::Decimal128(7, 2), false), + Field::new("wr_refunded_cash", DataType::Decimal128(7, 2), false), + Field::new("wr_reversed_charge", DataType::Decimal128(7, 2), false), + Field::new("wr_account_credit", DataType::Decimal128(7, 2), false), + Field::new("wr_net_loss", DataType::Decimal128(7, 2), false), + ]), + ), + TableDef::new( + "call_center", + Schema::new(vec![ + Field::new("cc_call_center_sk", DataType::Int32, false), + Field::new("cc_call_center_id", DataType::Utf8, false), + Field::new("cc_rec_start_date", DataType::Date32, false), + Field::new("cc_rec_end_date", DataType::Date32, false), + Field::new("cc_closed_date_sk", DataType::Int32, false), + Field::new("cc_open_date_sk", DataType::Int32, false), + Field::new("cc_name", DataType::Utf8, false), + Field::new("cc_class", DataType::Utf8, false), + Field::new("cc_employees", DataType::Int32, false), + Field::new("cc_sq_ft", DataType::Int32, false), + Field::new("cc_hours", DataType::Utf8, false), + Field::new("cc_manager", DataType::Utf8, false), + Field::new("cc_mkt_id", DataType::Int32, false), + Field::new("cc_mkt_class", DataType::Utf8, false), + Field::new("cc_mkt_desc", DataType::Utf8, false), + Field::new("cc_market_manager", DataType::Utf8, false), + Field::new("cc_division", DataType::Int32, false), + Field::new("cc_division_name", DataType::Utf8, false), + Field::new("cc_company", DataType::Int32, false), + Field::new("cc_company_name", DataType::Utf8, false), + Field::new("cc_street_number", DataType::Utf8, false), + Field::new("cc_street_name", DataType::Utf8, false), + Field::new("cc_street_type", DataType::Utf8, false), + Field::new("cc_suite_number", DataType::Utf8, false), + Field::new("cc_city", DataType::Utf8, false), + Field::new("cc_county", DataType::Utf8, false), + Field::new("cc_state", DataType::Utf8, false), + Field::new("cc_zip", DataType::Utf8, false), + Field::new("cc_country", DataType::Utf8, false), + Field::new("cc_gmt_offset", DataType::Decimal128(5, 2), false), + Field::new("cc_tax_percentage", DataType::Decimal128(5, 2), false), + ]), + ), + TableDef::new( + "catalog_page", + Schema::new(vec![ + Field::new("cp_catalog_page_sk", DataType::Int32, false), + Field::new("cp_catalog_page_id", DataType::Utf8, false), + Field::new("cp_start_date_sk", DataType::Int32, false), + Field::new("cp_end_date_sk", DataType::Int32, false), + Field::new("cp_department", DataType::Utf8, false), + Field::new("cp_catalog_number", DataType::Int32, false), + Field::new("cp_catalog_page_number", DataType::Int32, false), + Field::new("cp_description", DataType::Utf8, false), + Field::new("cp_type", DataType::Utf8, false), + ]), + ), + TableDef::new( + "customer", + Schema::new(vec![ + Field::new("c_customer_sk", DataType::Int32, false), + Field::new("c_customer_id", DataType::Utf8, false), + Field::new("c_current_cdemo_sk", DataType::Int32, false), + Field::new("c_current_hdemo_sk", DataType::Int32, false), + Field::new("c_current_addr_sk", DataType::Int32, false), + Field::new("c_first_shipto_date_sk", DataType::Int32, false), + Field::new("c_first_sales_date_sk", DataType::Int32, false), + Field::new("c_salutation", DataType::Utf8, false), + Field::new("c_first_name", DataType::Utf8, false), + Field::new("c_last_name", DataType::Utf8, false), + Field::new("c_preferred_cust_flag", DataType::Utf8, false), + Field::new("c_birth_day", DataType::Int32, false), + Field::new("c_birth_month", DataType::Int32, false), + Field::new("c_birth_year", DataType::Int32, false), + Field::new("c_birth_country", DataType::Utf8, false), + Field::new("c_login", DataType::Utf8, false), + Field::new("c_email_address", DataType::Utf8, false), + Field::new("c_last_review_date_sk", DataType::Utf8, false), + ]), + ), + TableDef::new( + "customer_address", + Schema::new(vec![ + Field::new("ca_address_sk", DataType::Int32, false), + Field::new("ca_address_id", DataType::Utf8, false), + Field::new("ca_street_number", DataType::Utf8, false), + Field::new("ca_street_name", DataType::Utf8, false), + Field::new("ca_street_type", DataType::Utf8, false), + Field::new("ca_suite_number", DataType::Utf8, false), + Field::new("ca_city", DataType::Utf8, false), + Field::new("ca_county", DataType::Utf8, false), + Field::new("ca_state", DataType::Utf8, false), + Field::new("ca_zip", DataType::Utf8, false), + Field::new("ca_country", DataType::Utf8, false), + Field::new("ca_gmt_offset", DataType::Decimal128(5, 2), false), + Field::new("ca_location_type", DataType::Utf8, false), + ]), + ), + TableDef::new( + "customer_demographics", + Schema::new(vec![ + Field::new("cd_demo_sk", DataType::Int32, false), + Field::new("cd_gender", DataType::Utf8, false), + Field::new("cd_marital_status", DataType::Utf8, false), + Field::new("cd_education_status", DataType::Utf8, false), + Field::new("cd_purchase_estimate", DataType::Int32, false), + Field::new("cd_credit_rating", DataType::Utf8, false), + Field::new("cd_dep_count", DataType::Int32, false), + Field::new("cd_dep_employed_count", DataType::Int32, false), + Field::new("cd_dep_college_count", DataType::Int32, false), + ]), + ), + TableDef::new( + "date_dim", + Schema::new(vec![ + Field::new("d_date_sk", DataType::Int32, false), + Field::new("d_date_id", DataType::Utf8, false), + Field::new("d_date", DataType::Date32, false), + Field::new("d_month_seq", DataType::Int32, false), + Field::new("d_week_seq", DataType::Int32, false), + Field::new("d_quarter_seq", DataType::Int32, false), + Field::new("d_year", DataType::Int32, false), + Field::new("d_dow", DataType::Int32, false), + Field::new("d_moy", DataType::Int32, false), + Field::new("d_dom", DataType::Int32, false), + Field::new("d_qoy", DataType::Int32, false), + Field::new("d_fy_year", DataType::Int32, false), + Field::new("d_fy_quarter_seq", DataType::Int32, false), + Field::new("d_fy_week_seq", DataType::Int32, false), + Field::new("d_day_name", DataType::Utf8, false), + Field::new("d_quarter_name", DataType::Utf8, false), + Field::new("d_holiday", DataType::Utf8, false), + Field::new("d_weekend", DataType::Utf8, false), + Field::new("d_following_holiday", DataType::Utf8, false), + Field::new("d_first_dom", DataType::Int32, false), + Field::new("d_last_dom", DataType::Int32, false), + Field::new("d_same_day_ly", DataType::Int32, false), + Field::new("d_same_day_lq", DataType::Int32, false), + Field::new("d_current_day", DataType::Utf8, false), + Field::new("d_current_week", DataType::Utf8, false), + Field::new("d_current_month", DataType::Utf8, false), + Field::new("d_current_quarter", DataType::Utf8, false), + Field::new("d_current_year", DataType::Utf8, false), + ]), + ), + TableDef::new( + "household_demographics", + Schema::new(vec![ + Field::new("hd_demo_sk", DataType::Int32, false), + Field::new("hd_income_band_sk", DataType::Int32, false), + Field::new("hd_buy_potential", DataType::Utf8, false), + Field::new("hd_dep_count", DataType::Int32, false), + Field::new("hd_vehicle_count", DataType::Int32, false), + ]), + ), + TableDef::new( + "income_band", + Schema::new(vec![ + Field::new("ib_income_band_sk", DataType::Int32, false), + Field::new("ib_lower_bound", DataType::Int32, false), + Field::new("ib_upper_bound", DataType::Int32, false), + ]), + ), + TableDef::new( + "item", + Schema::new(vec![ + Field::new("i_item_sk", DataType::Int32, false), + Field::new("i_item_id", DataType::Utf8, false), + Field::new("i_rec_start_date", DataType::Date32, false), + Field::new("i_rec_end_date", DataType::Date32, false), + Field::new("i_item_desc", DataType::Utf8, false), + Field::new("i_current_price", DataType::Decimal128(7, 2), false), + Field::new("i_wholesale_cost", DataType::Decimal128(7, 2), false), + Field::new("i_brand_id", DataType::Int32, false), + Field::new("i_brand", DataType::Utf8, false), + Field::new("i_class_id", DataType::Int32, false), + Field::new("i_class", DataType::Utf8, false), + Field::new("i_category_id", DataType::Int32, false), + Field::new("i_category", DataType::Utf8, false), + Field::new("i_manufact_id", DataType::Int32, false), + Field::new("i_manufact", DataType::Utf8, false), + Field::new("i_size", DataType::Utf8, false), + Field::new("i_formulation", DataType::Utf8, false), + Field::new("i_color", DataType::Utf8, false), + Field::new("i_units", DataType::Utf8, false), + Field::new("i_container", DataType::Utf8, false), + Field::new("i_manager_id", DataType::Int32, false), + Field::new("i_product_name", DataType::Utf8, false), + ]), + ), + TableDef::new( + "promotion", + Schema::new(vec![ + Field::new("p_promo_sk", DataType::Int32, false), + Field::new("p_promo_id", DataType::Utf8, false), + Field::new("p_start_date_sk", DataType::Int32, false), + Field::new("p_end_date_sk", DataType::Int32, false), + Field::new("p_item_sk", DataType::Int32, false), + Field::new("p_cost", DataType::Decimal128(15, 2), false), + Field::new("p_response_target", DataType::Int32, false), + Field::new("p_promo_name", DataType::Utf8, false), + Field::new("p_channel_dmail", DataType::Utf8, false), + Field::new("p_channel_email", DataType::Utf8, false), + Field::new("p_channel_catalog", DataType::Utf8, false), + Field::new("p_channel_tv", DataType::Utf8, false), + Field::new("p_channel_radio", DataType::Utf8, false), + Field::new("p_channel_press", DataType::Utf8, false), + Field::new("p_channel_event", DataType::Utf8, false), + Field::new("p_channel_demo", DataType::Utf8, false), + Field::new("p_channel_details", DataType::Utf8, false), + Field::new("p_purpose", DataType::Utf8, false), + Field::new("p_discount_active", DataType::Utf8, false), + ]), + ), + TableDef::new( + "reason", + Schema::new(vec![ + Field::new("r_reason_sk", DataType::Int32, false), + Field::new("r_reason_id", DataType::Utf8, false), + Field::new("r_reason_desc", DataType::Utf8, false), + ]), + ), + TableDef::new( + "ship_mode", + //), + Schema::new(vec![ + Field::new("sm_ship_mode_sk", DataType::Int32, false), + Field::new("sm_ship_mode_id", DataType::Utf8, false), + Field::new("sm_type", DataType::Utf8, false), + Field::new("sm_code", DataType::Utf8, false), + Field::new("sm_carrier", DataType::Utf8, false), + Field::new("sm_contract", DataType::Utf8, false), + ]), + ), + TableDef::new( + "store", + Schema::new(vec![ + Field::new("s_store_sk", DataType::Int32, false), + Field::new("s_store_id", DataType::Utf8, false), + Field::new("s_rec_start_date", DataType::Date32, false), + Field::new("s_rec_end_date", DataType::Date32, false), + Field::new("s_closed_date_sk", DataType::Int32, false), + Field::new("s_store_name", DataType::Utf8, false), + Field::new("s_number_employees", DataType::Int32, false), + Field::new("s_floor_space", DataType::Int32, false), + Field::new("s_hours", DataType::Utf8, false), + Field::new("s_manager", DataType::Utf8, false), + Field::new("s_market_id", DataType::Int32, false), + Field::new("s_geography_class", DataType::Utf8, false), + Field::new("s_market_desc", DataType::Utf8, false), + Field::new("s_market_manager", DataType::Utf8, false), + Field::new("s_division_id", DataType::Int32, false), + Field::new("s_division_name", DataType::Utf8, false), + Field::new("s_company_id", DataType::Int32, false), + Field::new("s_company_name", DataType::Utf8, false), + Field::new("s_street_number", DataType::Utf8, false), + Field::new("s_street_name", DataType::Utf8, false), + Field::new("s_street_type", DataType::Utf8, false), + Field::new("s_suite_number", DataType::Utf8, false), + Field::new("s_city", DataType::Utf8, false), + Field::new("s_county", DataType::Utf8, false), + Field::new("s_state", DataType::Utf8, false), + Field::new("s_zip", DataType::Utf8, false), + Field::new("s_country", DataType::Utf8, false), + Field::new("s_gmt_offset", DataType::Decimal128(5, 2), false), + Field::new("s_tax_precentage", DataType::Decimal128(5, 2), false), + ]), + ), + TableDef::new( + "time_dim", + Schema::new(vec![ + Field::new("t_time_sk", DataType::Int32, false), + Field::new("t_time_id", DataType::Utf8, false), + Field::new("t_time", DataType::Int32, false), + Field::new("t_hour", DataType::Int32, false), + Field::new("t_minute", DataType::Int32, false), + Field::new("t_second", DataType::Int32, false), + Field::new("t_am_pm", DataType::Utf8, false), + Field::new("t_shift", DataType::Utf8, false), + Field::new("t_sub_shift", DataType::Utf8, false), + Field::new("t_meal_time", DataType::Utf8, false), + ]), + ), + TableDef::new( + "warehouse", + //), + Schema::new(vec![ + Field::new("w_warehouse_sk", DataType::Int32, false), + Field::new("w_warehouse_id", DataType::Utf8, false), + Field::new("w_warehouse_name", DataType::Utf8, false), + Field::new("w_warehouse_sq_ft", DataType::Int32, false), + Field::new("w_street_number", DataType::Utf8, false), + Field::new("w_street_name", DataType::Utf8, false), + Field::new("w_street_type", DataType::Utf8, false), + Field::new("w_suite_number", DataType::Utf8, false), + Field::new("w_city", DataType::Utf8, false), + Field::new("w_county", DataType::Utf8, false), + Field::new("w_state", DataType::Utf8, false), + Field::new("w_zip", DataType::Utf8, false), + Field::new("w_country", DataType::Utf8, false), + Field::new("w_gmt_offset", DataType::Decimal128(5, 2), false), + ]), + ), + TableDef::new( + "web_page", + Schema::new(vec![ + Field::new("wp_web_page_sk", DataType::Int32, false), + Field::new("wp_web_page_id", DataType::Utf8, false), + Field::new("wp_rec_start_date", DataType::Date32, false), + Field::new("wp_rec_end_date", DataType::Date32, false), + Field::new("wp_creation_date_sk", DataType::Int32, false), + Field::new("wp_access_date_sk", DataType::Int32, false), + Field::new("wp_autogen_flag", DataType::Utf8, false), + Field::new("wp_customer_sk", DataType::Int32, false), + Field::new("wp_url", DataType::Utf8, false), + Field::new("wp_type", DataType::Utf8, false), + Field::new("wp_char_count", DataType::Int32, false), + Field::new("wp_link_count", DataType::Int32, false), + Field::new("wp_image_count", DataType::Int32, false), + Field::new("wp_max_ad_count", DataType::Int32, false), + ]), + ), + TableDef::new( + "web_site", + Schema::new(vec![ + Field::new("web_site_sk", DataType::Int32, false), + Field::new("web_site_id", DataType::Utf8, false), + Field::new("web_rec_start_date", DataType::Date32, false), + Field::new("web_rec_end_date", DataType::Date32, false), + Field::new("web_name", DataType::Utf8, false), + Field::new("web_open_date_sk", DataType::Int32, false), + Field::new("web_close_date_sk", DataType::Int32, false), + Field::new("web_class", DataType::Utf8, false), + Field::new("web_manager", DataType::Utf8, false), + Field::new("web_mkt_id", DataType::Int32, false), + Field::new("web_mkt_class", DataType::Utf8, false), + Field::new("web_mkt_desc", DataType::Utf8, false), + Field::new("web_market_manager", DataType::Utf8, false), + Field::new("web_company_id", DataType::Int32, false), + Field::new("web_company_name", DataType::Utf8, false), + Field::new("web_street_number", DataType::Utf8, false), + Field::new("web_street_name", DataType::Utf8, false), + Field::new("web_street_type", DataType::Utf8, false), + Field::new("web_suite_number", DataType::Utf8, false), + Field::new("web_city", DataType::Utf8, false), + Field::new("web_county", DataType::Utf8, false), + Field::new("web_state", DataType::Utf8, false), + Field::new("web_zip", DataType::Utf8, false), + Field::new("web_country", DataType::Utf8, false), + Field::new("web_gmt_offset", DataType::Decimal128(5, 2), false), + Field::new("web_tax_percentage", DataType::Decimal128(5, 2), false), + ]), + ), + ] +} diff --git a/test-utils/src/tpch.rs b/test-utils/src/tpch.rs new file mode 100644 index 000000000000..636221f71e51 --- /dev/null +++ b/test-utils/src/tpch.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::TableDef; +use arrow::datatypes::{DataType, Field, Schema}; + +/// Schemas for the TPCH tables +pub fn tpch_schemas() -> Vec { + let lineitem_schema = Schema::new(vec![ + Field::new("l_orderkey", DataType::Int64, false), + Field::new("l_partkey", DataType::Int64, false), + Field::new("l_suppkey", DataType::Int64, false), + Field::new("l_linenumber", DataType::Int32, false), + Field::new("l_quantity", DataType::Decimal128(15, 2), false), + Field::new("l_extendedprice", DataType::Decimal128(15, 2), false), + Field::new("l_discount", DataType::Decimal128(15, 2), false), + Field::new("l_tax", DataType::Decimal128(15, 2), false), + Field::new("l_returnflag", DataType::Utf8, false), + Field::new("l_linestatus", DataType::Utf8, false), + Field::new("l_shipdate", DataType::Date32, false), + Field::new("l_commitdate", DataType::Date32, false), + Field::new("l_receiptdate", DataType::Date32, false), + Field::new("l_shipinstruct", DataType::Utf8, false), + Field::new("l_shipmode", DataType::Utf8, false), + Field::new("l_comment", DataType::Utf8, false), + ]); + + let orders_schema = Schema::new(vec![ + Field::new("o_orderkey", DataType::Int64, false), + Field::new("o_custkey", DataType::Int64, false), + Field::new("o_orderstatus", DataType::Utf8, false), + Field::new("o_totalprice", DataType::Decimal128(15, 2), false), + Field::new("o_orderdate", DataType::Date32, false), + Field::new("o_orderpriority", DataType::Utf8, false), + Field::new("o_clerk", DataType::Utf8, false), + Field::new("o_shippriority", DataType::Int32, false), + Field::new("o_comment", DataType::Utf8, false), + ]); + + let part_schema = Schema::new(vec![ + Field::new("p_partkey", DataType::Int64, false), + Field::new("p_name", DataType::Utf8, false), + Field::new("p_mfgr", DataType::Utf8, false), + Field::new("p_brand", DataType::Utf8, false), + Field::new("p_type", DataType::Utf8, false), + Field::new("p_size", DataType::Int32, false), + Field::new("p_container", DataType::Utf8, false), + Field::new("p_retailprice", DataType::Decimal128(15, 2), false), + Field::new("p_comment", DataType::Utf8, false), + ]); + + let supplier_schema = Schema::new(vec![ + Field::new("s_suppkey", DataType::Int64, false), + Field::new("s_name", DataType::Utf8, false), + Field::new("s_address", DataType::Utf8, false), + Field::new("s_nationkey", DataType::Int64, false), + Field::new("s_phone", DataType::Utf8, false), + Field::new("s_acctbal", DataType::Decimal128(15, 2), false), + Field::new("s_comment", DataType::Utf8, false), + ]); + + let partsupp_schema = Schema::new(vec![ + Field::new("ps_partkey", DataType::Int64, false), + Field::new("ps_suppkey", DataType::Int64, false), + Field::new("ps_availqty", DataType::Int32, false), + Field::new("ps_supplycost", DataType::Decimal128(15, 2), false), + Field::new("ps_comment", DataType::Utf8, false), + ]); + + let customer_schema = Schema::new(vec![ + Field::new("c_custkey", DataType::Int64, false), + Field::new("c_name", DataType::Utf8, false), + Field::new("c_address", DataType::Utf8, false), + Field::new("c_nationkey", DataType::Int64, false), + Field::new("c_phone", DataType::Utf8, false), + Field::new("c_acctbal", DataType::Decimal128(15, 2), false), + Field::new("c_mktsegment", DataType::Utf8, false), + Field::new("c_comment", DataType::Utf8, false), + ]); + + let nation_schema = Schema::new(vec![ + Field::new("n_nationkey", DataType::Int64, false), + Field::new("n_name", DataType::Utf8, false), + Field::new("n_regionkey", DataType::Int64, false), + Field::new("n_comment", DataType::Utf8, false), + ]); + + let region_schema = Schema::new(vec![ + Field::new("r_regionkey", DataType::Int64, false), + Field::new("r_name", DataType::Utf8, false), + Field::new("r_comment", DataType::Utf8, false), + ]); + + vec![ + TableDef::new("lineitem", lineitem_schema), + TableDef::new("orders", orders_schema), + TableDef::new("part", part_schema), + TableDef::new("supplier", supplier_schema), + TableDef::new("partsupp", partsupp_schema), + TableDef::new("customer", customer_schema), + TableDef::new("nation", nation_schema), + TableDef::new("region", region_schema), + ] +} From 2f550032140d42d1ee6d8ed86f7790766fa7302e Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 3 Apr 2024 22:20:01 +0200 Subject: [PATCH 2/2] Simplify Expr::map_children (#9876) * add map_until_stop_and_collect macro * fix clippy * simplify * Update datafusion/common/src/tree_node.rs Co-authored-by: Andrew Lamb * add documentation * fix macro --------- Co-authored-by: Andrew Lamb --- datafusion/common/src/tree_node.rs | 82 ++++++++-- datafusion/expr/src/tree_node/expr.rs | 226 ++++++++++++-------------- 2 files changed, 171 insertions(+), 137 deletions(-) diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 2d653a27c47b..554722f37ba2 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -532,8 +532,20 @@ impl Transformed { } } -/// Transformation helper to process tree nodes that are siblings. +/// Transformation helper to process a sequence of iterable tree nodes that are siblings. pub trait TransformedIterator: Iterator { + /// Apples `f` to each item in this iterator + /// + /// Visits all items in the iterator unless + /// `f` returns an error or `f` returns TreeNodeRecursion::stop. + /// + /// # Returns + /// Error if `f` returns an error + /// + /// Ok(Transformed) such that: + /// 1. `transformed` is true if any return from `f` had transformed true + /// 2. `data` from the last invocation of `f` + /// 3. `tnr` from the last invocation of `f` or `Continue` if the iterator is empty fn map_until_stop_and_collect< F: FnMut(Self::Item) -> Result>, >( @@ -551,22 +563,64 @@ impl TransformedIterator for I { ) -> Result>> { let mut tnr = TreeNodeRecursion::Continue; let mut transformed = false; - let data = self - .map(|item| match tnr { - TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => { - f(item).map(|result| { - tnr = result.tnr; - transformed |= result.transformed; - result.data - }) - } - TreeNodeRecursion::Stop => Ok(item), - }) - .collect::>>()?; - Ok(Transformed::new(data, transformed, tnr)) + self.map(|item| match tnr { + TreeNodeRecursion::Continue | TreeNodeRecursion::Jump => { + f(item).map(|result| { + tnr = result.tnr; + transformed |= result.transformed; + result.data + }) + } + TreeNodeRecursion::Stop => Ok(item), + }) + .collect::>>() + .map(|data| Transformed::new(data, transformed, tnr)) } } +/// Transformation helper to process a heterogeneous sequence of tree node containing +/// expressions. +/// This macro is very similar to [TransformedIterator::map_until_stop_and_collect] to +/// process nodes that are siblings, but it accepts an initial transformation (`F0`) and +/// a sequence of pairs. Each pair is made of an expression (`EXPR`) and its +/// transformation (`F`). +/// +/// The macro builds up a tuple that contains `Transformed.data` result of `F0` as the +/// first element and further elements from the sequence of pairs. An element from a pair +/// is either the value of `EXPR` or the `Transformed.data` result of `F`, depending on +/// the `Transformed.tnr` result of previous `F`s (`F0` initially). +/// +/// # Returns +/// Error if any of the transformations returns an error +/// +/// Ok(Transformed<(data0, ..., dataN)>) such that: +/// 1. `transformed` is true if any of the transformations had transformed true +/// 2. `(data0, ..., dataN)`, where `data0` is the `Transformed.data` from `F0` and +/// `data1` ... `dataN` are from either `EXPR` or the `Transformed.data` of `F` +/// 3. `tnr` from `F0` or the last invocation of `F` +#[macro_export] +macro_rules! map_until_stop_and_collect { + ($F0:expr, $($EXPR:expr, $F:expr),*) => {{ + $F0.and_then(|Transformed { data: data0, mut transformed, mut tnr }| { + let all_datas = ( + data0, + $( + if tnr == TreeNodeRecursion::Continue || tnr == TreeNodeRecursion::Jump { + $F.map(|result| { + tnr = result.tnr; + transformed |= result.transformed; + result.data + })? + } else { + $EXPR + }, + )* + ); + Ok(Transformed::new(all_datas, transformed, tnr)) + }) + }} +} + /// Transformation helper to access [`Transformed`] fields in a [`Result`] easily. pub trait TransformedResult { fn data(self) -> Result; diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index 0909d8f662f6..df1585e5a598 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -27,7 +27,9 @@ use crate::{Expr, GetFieldAccess}; use datafusion_common::tree_node::{ Transformed, TransformedIterator, TreeNode, TreeNodeRecursion, }; -use datafusion_common::{handle_visit_recursion, internal_err, Result}; +use datafusion_common::{ + handle_visit_recursion, internal_err, map_until_stop_and_collect, Result, +}; impl TreeNode for Expr { fn apply_children Result>( @@ -167,15 +169,14 @@ impl TreeNode for Expr { Expr::InSubquery(InSubquery::new(be, subquery, negated)) }), Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - transform_box(left, &mut f)? - .update_data(|new_left| (new_left, right)) - .try_transform_node(|(new_left, right)| { - Ok(transform_box(right, &mut f)? - .update_data(|new_right| (new_left, new_right))) - })? - .update_data(|(new_left, new_right)| { - Expr::BinaryExpr(BinaryExpr::new(new_left, op, new_right)) - }) + map_until_stop_and_collect!( + transform_box(left, &mut f), + right, + transform_box(right, &mut f) + )? + .update_data(|(new_left, new_right)| { + Expr::BinaryExpr(BinaryExpr::new(new_left, op, new_right)) + }) } Expr::Like(Like { negated, @@ -183,42 +184,40 @@ impl TreeNode for Expr { pattern, escape_char, case_insensitive, - }) => transform_box(expr, &mut f)? - .update_data(|new_expr| (new_expr, pattern)) - .try_transform_node(|(new_expr, pattern)| { - Ok(transform_box(pattern, &mut f)? - .update_data(|new_pattern| (new_expr, new_pattern))) - })? - .update_data(|(new_expr, new_pattern)| { - Expr::Like(Like::new( - negated, - new_expr, - new_pattern, - escape_char, - case_insensitive, - )) - }), + }) => map_until_stop_and_collect!( + transform_box(expr, &mut f), + pattern, + transform_box(pattern, &mut f) + )? + .update_data(|(new_expr, new_pattern)| { + Expr::Like(Like::new( + negated, + new_expr, + new_pattern, + escape_char, + case_insensitive, + )) + }), Expr::SimilarTo(Like { negated, expr, pattern, escape_char, case_insensitive, - }) => transform_box(expr, &mut f)? - .update_data(|new_expr| (new_expr, pattern)) - .try_transform_node(|(new_expr, pattern)| { - Ok(transform_box(pattern, &mut f)? - .update_data(|new_pattern| (new_expr, new_pattern))) - })? - .update_data(|(new_expr, new_pattern)| { - Expr::SimilarTo(Like::new( - negated, - new_expr, - new_pattern, - escape_char, - case_insensitive, - )) - }), + }) => map_until_stop_and_collect!( + transform_box(expr, &mut f), + pattern, + transform_box(pattern, &mut f) + )? + .update_data(|(new_expr, new_pattern)| { + Expr::SimilarTo(Like::new( + negated, + new_expr, + new_pattern, + escape_char, + case_insensitive, + )) + }), Expr::Not(expr) => transform_box(expr, &mut f)?.update_data(Expr::Not), Expr::IsNotNull(expr) => { transform_box(expr, &mut f)?.update_data(Expr::IsNotNull) @@ -248,48 +247,38 @@ impl TreeNode for Expr { negated, low, high, - }) => transform_box(expr, &mut f)? - .update_data(|new_expr| (new_expr, low, high)) - .try_transform_node(|(new_expr, low, high)| { - Ok(transform_box(low, &mut f)? - .update_data(|new_low| (new_expr, new_low, high))) - })? - .try_transform_node(|(new_expr, new_low, high)| { - Ok(transform_box(high, &mut f)? - .update_data(|new_high| (new_expr, new_low, new_high))) - })? - .update_data(|(new_expr, new_low, new_high)| { - Expr::Between(Between::new(new_expr, negated, new_low, new_high)) - }), + }) => map_until_stop_and_collect!( + transform_box(expr, &mut f), + low, + transform_box(low, &mut f), + high, + transform_box(high, &mut f) + )? + .update_data(|(new_expr, new_low, new_high)| { + Expr::Between(Between::new(new_expr, negated, new_low, new_high)) + }), Expr::Case(Case { expr, when_then_expr, else_expr, - }) => transform_option_box(expr, &mut f)? - .update_data(|new_expr| (new_expr, when_then_expr, else_expr)) - .try_transform_node(|(new_expr, when_then_expr, else_expr)| { - Ok(when_then_expr - .into_iter() - .map_until_stop_and_collect(|(when, then)| { - transform_box(when, &mut f)? - .update_data(|new_when| (new_when, then)) - .try_transform_node(|(new_when, then)| { - Ok(transform_box(then, &mut f)? - .update_data(|new_then| (new_when, new_then))) - }) - })? - .update_data(|new_when_then_expr| { - (new_expr, new_when_then_expr, else_expr) - })) - })? - .try_transform_node(|(new_expr, new_when_then_expr, else_expr)| { - Ok(transform_option_box(else_expr, &mut f)?.update_data( - |new_else_expr| (new_expr, new_when_then_expr, new_else_expr), - )) - })? - .update_data(|(new_expr, new_when_then_expr, new_else_expr)| { - Expr::Case(Case::new(new_expr, new_when_then_expr, new_else_expr)) - }), + }) => map_until_stop_and_collect!( + transform_option_box(expr, &mut f), + when_then_expr, + when_then_expr + .into_iter() + .map_until_stop_and_collect(|(when, then)| { + map_until_stop_and_collect!( + transform_box(when, &mut f), + then, + transform_box(then, &mut f) + ) + }), + else_expr, + transform_option_box(else_expr, &mut f) + )? + .update_data(|(new_expr, new_when_then_expr, new_else_expr)| { + Expr::Case(Case::new(new_expr, new_when_then_expr, new_else_expr)) + }), Expr::Cast(Cast { expr, data_type }) => transform_box(expr, &mut f)? .update_data(|be| Expr::Cast(Cast::new(be, data_type))), Expr::TryCast(TryCast { expr, data_type }) => transform_box(expr, &mut f)? @@ -320,30 +309,23 @@ impl TreeNode for Expr { order_by, window_frame, null_treatment, - }) => transform_vec(args, &mut f)? - .update_data(|new_args| (new_args, partition_by, order_by)) - .try_transform_node(|(new_args, partition_by, order_by)| { - Ok(transform_vec(partition_by, &mut f)?.update_data( - |new_partition_by| (new_args, new_partition_by, order_by), - )) - })? - .try_transform_node(|(new_args, new_partition_by, order_by)| { - Ok( - transform_vec(order_by, &mut f)?.update_data(|new_order_by| { - (new_args, new_partition_by, new_order_by) - }), - ) - })? - .update_data(|(new_args, new_partition_by, new_order_by)| { - Expr::WindowFunction(WindowFunction::new( - fun, - new_args, - new_partition_by, - new_order_by, - window_frame, - null_treatment, - )) - }), + }) => map_until_stop_and_collect!( + transform_vec(args, &mut f), + partition_by, + transform_vec(partition_by, &mut f), + order_by, + transform_vec(order_by, &mut f) + )? + .update_data(|(new_args, new_partition_by, new_order_by)| { + Expr::WindowFunction(WindowFunction::new( + fun, + new_args, + new_partition_by, + new_order_by, + window_frame, + null_treatment, + )) + }), Expr::AggregateFunction(AggregateFunction { args, func_def, @@ -351,17 +333,15 @@ impl TreeNode for Expr { filter, order_by, null_treatment, - }) => transform_vec(args, &mut f)? - .update_data(|new_args| (new_args, filter, order_by)) - .try_transform_node(|(new_args, filter, order_by)| { - Ok(transform_option_box(filter, &mut f)? - .update_data(|new_filter| (new_args, new_filter, order_by))) - })? - .try_transform_node(|(new_args, new_filter, order_by)| { - Ok(transform_option_vec(order_by, &mut f)? - .update_data(|new_order_by| (new_args, new_filter, new_order_by))) - })? - .map_data(|(new_args, new_filter, new_order_by)| match func_def { + }) => map_until_stop_and_collect!( + transform_vec(args, &mut f), + filter, + transform_option_box(filter, &mut f), + order_by, + transform_option_vec(order_by, &mut f) + )? + .map_data( + |(new_args, new_filter, new_order_by)| match func_def { AggregateFunctionDefinition::BuiltIn(fun) => { Ok(Expr::AggregateFunction(AggregateFunction::new( fun, @@ -385,7 +365,8 @@ impl TreeNode for Expr { AggregateFunctionDefinition::Name(_) => { internal_err!("Function `Expr` with name should be resolved.") } - })?, + }, + )?, Expr::GroupingSet(grouping_set) => match grouping_set { GroupingSet::Rollup(exprs) => transform_vec(exprs, &mut f)? .update_data(|ve| Expr::GroupingSet(GroupingSet::Rollup(ve))), @@ -402,15 +383,14 @@ impl TreeNode for Expr { expr, list, negated, - }) => transform_box(expr, &mut f)? - .update_data(|new_expr| (new_expr, list)) - .try_transform_node(|(new_expr, list)| { - Ok(transform_vec(list, &mut f)? - .update_data(|new_list| (new_expr, new_list))) - })? - .update_data(|(new_expr, new_list)| { - Expr::InList(InList::new(new_expr, new_list, negated)) - }), + }) => map_until_stop_and_collect!( + transform_box(expr, &mut f), + list, + transform_vec(list, &mut f) + )? + .update_data(|(new_expr, new_list)| { + Expr::InList(InList::new(new_expr, new_list, negated)) + }), Expr::GetIndexedField(GetIndexedField { expr, field }) => { transform_box(expr, &mut f)?.update_data(|be| { Expr::GetIndexedField(GetIndexedField::new(be, field))