diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java index 45ef62f680125..21e449f683081 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java @@ -19,7 +19,7 @@ public class CHDatasourceJniWrapper { public native long nativeInitFileWriterWrapper( - String filePath, byte[] dataSchema, String formatHint); + String filePath, byte[] preferredSchema, String formatHint); public native long nativeInitMergeTreeWriterWrapper( byte[] plan, diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala index 879bee9f03594..71344fbeb6765 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala @@ -217,7 +217,7 @@ class GlutenClickHouseNativeWriteTableSuite .toDF() } - ignore("supplier: csv to parquet- insert overwrite local directory") { + test("supplier: csv to parquet- insert overwrite local directory") { withSource(supplierDF, "supplier") { nativeWrite { format => @@ -230,7 +230,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("supplier: csv to parquet- insert into one partition") { + test("supplier: csv to parquet- insert into one partition") { val originViewName = "supplier" lazy val create_columns = supplierSchema .filterNot(f => f.name.equals("s_nationkey")) @@ -256,7 +256,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test insert into dir") { + test("test insert into dir") { withSource(genTestData(), "origin_table") { nativeWrite { format => @@ -272,7 +272,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test insert into partition") { + test("test insert into partition") { def destination(format: String): (String, String, String) = { val table_name = table_name_template.format(format) val table_create_sql = @@ -301,7 +301,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test CTAS") { + test("test CTAS") { withSource(genTestData(), "origin_table") { nativeWrite { format => @@ -334,7 +334,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test insert into partition, bigo's case which incur InsertIntoHiveTable") { + test("test insert into partition, bigo's case which incur InsertIntoHiveTable") { def destination(format: String): (String, String, String) = { val table_name = table_name_template.format(format) val table_create_sql = s"create table if not exists $table_name (" + fields_ @@ -368,7 +368,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test 1-col partitioned table") { + test("test 1-col partitioned table") { val origin_table = "origin_table" withSource(genTestData(), origin_table) { nativeWrite2( @@ -390,7 +390,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test 1-col partitioned table, partitioned by already ordered column") { + test("test 1-col partitioned table, partitioned by already ordered column") { val origin_table = "origin_table" def destination(format: String): (String, String, String) = { val table_name = table_name_template.format(format) @@ -414,7 +414,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test 2-col partitioned table") { + test("test 2-col partitioned table") { val fields: ListMap[String, String] = ListMap( ("string_field", "string"), ("int_field", "int"), @@ -531,7 +531,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test hive parquet/orc table with aggregated results") { + test("test hive parquet/orc table with aggregated results") { val fields: ListMap[String, String] = ListMap( ("sum(int_field)", "bigint") ) @@ -555,7 +555,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test 1-col partitioned + 1-col bucketed table") { + test("test 1-col partitioned + 1-col bucketed table") { val origin_table = "origin_table" withSource(genTestData(), origin_table) { nativeWrite { @@ -588,7 +588,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test table bucketed by all typed columns") { + test("test table bucketed by all typed columns") { val fields: ListMap[String, String] = ListMap( ("string_field", "string"), ("int_field", "int"), @@ -655,7 +655,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test 1-col partitioned + 2-col bucketed table") { + test("test 1-col partitioned + 2-col bucketed table") { val fields: ListMap[String, String] = ListMap( ("string_field", "string"), ("int_field", "int"), @@ -718,7 +718,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test consecutive blocks having same partition value") { + test("test consecutive blocks having same partition value") { nativeWrite { format => val table_name = table_name_template.format(format) @@ -740,7 +740,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test decimal with rand()") { + test("test decimal with rand()") { nativeWrite { format => val table_name = table_name_template.format(format) @@ -758,7 +758,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test partitioned by constant") { + test("test partitioned by constant") { nativeWrite2 { format => val table_name = s"tmp_123_$format" @@ -775,7 +775,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test bucketed by constant") { + test("test bucketed by constant") { nativeWrite { format => val table_name = table_name_template.format(format) @@ -793,7 +793,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test consecutive null values being partitioned") { + test("test consecutive null values being partitioned") { nativeWrite { format => val table_name = table_name_template.format(format) @@ -811,7 +811,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test consecutive null values being bucketed") { + test("test consecutive null values being bucketed") { nativeWrite { format => val table_name = table_name_template.format(format) @@ -829,7 +829,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test native write with empty dataset") { + test("test native write with empty dataset") { nativeWrite2( format => { val table_name = "t_" + format @@ -845,7 +845,7 @@ class GlutenClickHouseNativeWriteTableSuite ) } - ignore("test native write with union") { + test("test native write with union") { nativeWrite { format => val table_name = "t_" + format @@ -867,7 +867,7 @@ class GlutenClickHouseNativeWriteTableSuite } } - ignore("test native write and non-native read consistency") { + test("test native write and non-native read consistency") { nativeWrite2( { format => @@ -910,22 +910,28 @@ class GlutenClickHouseNativeWriteTableSuite // which cause core dump. see https://github.com/apache/incubator-gluten/issues/6561 // for details. val insert_sql = - s"""insert overwrite $table_name partition (day) - |select id as a, - | str_to_map(concat('t1:','a','&t2:','b'),'&',':'), - | struct('1', null) as c, - | '2024-01-08' as day - |from range(10)""".stripMargin + if (isSparkVersionLE("3.3")) { + s"""insert overwrite $table_name partition (day) + |select id as a, + | str_to_map(concat('t1:','a','&t2:','b'),'&',':'), + | struct('1', null) as c, + | '2024-01-08' as day + |from range(10)""".stripMargin + } else { + s"""insert overwrite $table_name partition (day) + |select id as a, + | map('t1', 'a', 't2', 'b'), + | struct('1', null) as c, + | '2024-01-08' as day + |from range(10)""".stripMargin + } (table_name, create_sql, insert_sql) }, (table_name, _) => - if (isSparkVersionGE("3.4")) { - // FIXME: Don't Know Why Failed - compareResultsAgainstVanillaSpark( - s"select * from $table_name", - compareResult = true, - _ => {}) - } + compareResultsAgainstVanillaSpark( + s"select * from $table_name", + compareResult = true, + _ => {}) ) } } diff --git a/cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.cpp b/cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.cpp index 9ba8d748dc67c..c54f2e7b33bf2 100644 --- a/cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.cpp @@ -37,9 +37,7 @@ OutputFormatFile::OutputFormatPtr ORCOutputFormatFile::createOutputFormat(const auto res = std::make_shared(); res->write_buffer = write_buffer_builder->build(file_uri); - std::cout << "xxx old_header:" << header.dumpStructure() << std::endl; auto new_header = creatHeaderWithPreferredSchema(header); - std::cout << "xxx new_header:" << new_header.dumpStructure() << std::endl; // TODO: align all spark orc config with ch orc config auto format_settings = DB::getFormatSettings(context); auto output_format = std::make_shared(*(res->write_buffer), new_header, format_settings); diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp index bce4c4d457833..1c57010751c0f 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp @@ -68,11 +68,6 @@ FormatFile::InputFormatPtr ORCFormatFile::createInputFormat(const DB::Block & he format_settings.orc.skip_stripes = std::unordered_set(skip_stripe_indices.begin(), skip_stripe_indices.end()); - std::cout << "orc skip stripes:" << std::endl; - for (const auto & skip_strpe: format_settings.orc.skip_stripes) - std::cout << skip_strpe << std::endl; - std::cout << "read header:" << header.dumpStructure() << std::endl; - auto input_format = std::make_shared(*file_format->read_buffer, header, format_settings); file_format->input = input_format; return file_format; diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp index 03beca5cf88ab..5b872244eab5c 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp @@ -26,17 +26,16 @@ #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include +#include "DataTypes/DataTypesDecimal.h" namespace DB { @@ -352,13 +351,6 @@ bool NormalFileReader::pull(DB::Chunk & chunk) return false; DB::Chunk raw_chunk = input_format->input->generate(); - - const auto & raw_columns = raw_chunk.getColumns(); - for (const auto & raw_column: raw_columns) - { - debug::headColumn(raw_column, 10); - } - const size_t rows = raw_chunk.getNumRows(); if (!rows) return false; diff --git a/cpp-ch/local-engine/examples/orc_read.cpp b/cpp-ch/local-engine/examples/orc_read.cpp new file mode 100644 index 0000000000000..489d57bd1d1f6 --- /dev/null +++ b/cpp-ch/local-engine/examples/orc_read.cpp @@ -0,0 +1,38 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace DB; + +int main() +{ + String path = "/data1/liyang/cppproject/spark/spark-3.3.2-bin-hadoop3/t_orc/data.orc"; + ReadBufferFromFile read_buffer(path); + + DataTypePtr string_type = std::make_shared(std::make_shared()); + DataTypes elem_types = {string_type, string_type}; + Strings elem_names = {"1", "2"}; + DataTypePtr tuple_type = std::make_shared(std::move(elem_types), std::move(elem_names)); + tuple_type = std::make_shared(std::move(tuple_type)); + + Block header({ + {nullptr, tuple_type, "c"}, + // {nullptr, std::make_shared(std::make_shared()), "a"}, + }); + + FormatSettings format_settings; + InputFormatPtr format = std::make_shared(read_buffer, header, format_settings); + QueryPipeline pipeline(std::move(format)); + PullingPipelineExecutor reader(pipeline); + Block block; + reader.pull(block); + debug::headBlock(block, 10); + return 0; +} \ No newline at end of file