Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
taiyang-li committed Aug 2, 2024
1 parent 6badc77 commit bcc2dcb
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class CHDatasourceJniWrapper {

public native long nativeInitFileWriterWrapper(
String filePath, byte[] dataSchema, String formatHint);
String filePath, byte[] preferredSchema, String formatHint);

public native long nativeInitMergeTreeWriterWrapper(
byte[] plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class GlutenClickHouseNativeWriteTableSuite
.toDF()
}

ignore("supplier: csv to parquet- insert overwrite local directory") {
test("supplier: csv to parquet- insert overwrite local directory") {
withSource(supplierDF, "supplier") {
nativeWrite {
format =>
Expand All @@ -230,7 +230,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("supplier: csv to parquet- insert into one partition") {
test("supplier: csv to parquet- insert into one partition") {
val originViewName = "supplier"
lazy val create_columns = supplierSchema
.filterNot(f => f.name.equals("s_nationkey"))
Expand All @@ -256,7 +256,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test insert into dir") {
test("test insert into dir") {
withSource(genTestData(), "origin_table") {
nativeWrite {
format =>
Expand All @@ -272,7 +272,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test insert into partition") {
test("test insert into partition") {
def destination(format: String): (String, String, String) = {
val table_name = table_name_template.format(format)
val table_create_sql =
Expand Down Expand Up @@ -301,7 +301,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test CTAS") {
test("test CTAS") {
withSource(genTestData(), "origin_table") {
nativeWrite {
format =>
Expand Down Expand Up @@ -334,7 +334,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test insert into partition, bigo's case which incur InsertIntoHiveTable") {
test("test insert into partition, bigo's case which incur InsertIntoHiveTable") {
def destination(format: String): (String, String, String) = {
val table_name = table_name_template.format(format)
val table_create_sql = s"create table if not exists $table_name (" + fields_
Expand Down Expand Up @@ -368,7 +368,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test 1-col partitioned table") {
test("test 1-col partitioned table") {
val origin_table = "origin_table"
withSource(genTestData(), origin_table) {
nativeWrite2(
Expand All @@ -390,7 +390,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test 1-col partitioned table, partitioned by already ordered column") {
test("test 1-col partitioned table, partitioned by already ordered column") {
val origin_table = "origin_table"
def destination(format: String): (String, String, String) = {
val table_name = table_name_template.format(format)
Expand All @@ -414,7 +414,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test 2-col partitioned table") {
test("test 2-col partitioned table") {
val fields: ListMap[String, String] = ListMap(
("string_field", "string"),
("int_field", "int"),
Expand Down Expand Up @@ -531,7 +531,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test hive parquet/orc table with aggregated results") {
test("test hive parquet/orc table with aggregated results") {
val fields: ListMap[String, String] = ListMap(
("sum(int_field)", "bigint")
)
Expand All @@ -555,7 +555,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test 1-col partitioned + 1-col bucketed table") {
test("test 1-col partitioned + 1-col bucketed table") {
val origin_table = "origin_table"
withSource(genTestData(), origin_table) {
nativeWrite {
Expand Down Expand Up @@ -588,7 +588,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test table bucketed by all typed columns") {
test("test table bucketed by all typed columns") {
val fields: ListMap[String, String] = ListMap(
("string_field", "string"),
("int_field", "int"),
Expand Down Expand Up @@ -655,7 +655,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test 1-col partitioned + 2-col bucketed table") {
test("test 1-col partitioned + 2-col bucketed table") {
val fields: ListMap[String, String] = ListMap(
("string_field", "string"),
("int_field", "int"),
Expand Down Expand Up @@ -718,7 +718,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test consecutive blocks having same partition value") {
test("test consecutive blocks having same partition value") {
nativeWrite {
format =>
val table_name = table_name_template.format(format)
Expand All @@ -740,7 +740,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test decimal with rand()") {
test("test decimal with rand()") {
nativeWrite {
format =>
val table_name = table_name_template.format(format)
Expand All @@ -758,7 +758,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test partitioned by constant") {
test("test partitioned by constant") {
nativeWrite2 {
format =>
val table_name = s"tmp_123_$format"
Expand All @@ -775,7 +775,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test bucketed by constant") {
test("test bucketed by constant") {
nativeWrite {
format =>
val table_name = table_name_template.format(format)
Expand All @@ -793,7 +793,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test consecutive null values being partitioned") {
test("test consecutive null values being partitioned") {
nativeWrite {
format =>
val table_name = table_name_template.format(format)
Expand All @@ -811,7 +811,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test consecutive null values being bucketed") {
test("test consecutive null values being bucketed") {
nativeWrite {
format =>
val table_name = table_name_template.format(format)
Expand All @@ -829,7 +829,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test native write with empty dataset") {
test("test native write with empty dataset") {
nativeWrite2(
format => {
val table_name = "t_" + format
Expand All @@ -845,7 +845,7 @@ class GlutenClickHouseNativeWriteTableSuite
)
}

ignore("test native write with union") {
test("test native write with union") {
nativeWrite {
format =>
val table_name = "t_" + format
Expand All @@ -867,7 +867,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
}

ignore("test native write and non-native read consistency") {
test("test native write and non-native read consistency") {
nativeWrite2(
{
format =>
Expand Down Expand Up @@ -910,22 +910,28 @@ class GlutenClickHouseNativeWriteTableSuite
// which cause core dump. see https://github.com/apache/incubator-gluten/issues/6561
// for details.
val insert_sql =
s"""insert overwrite $table_name partition (day)
|select id as a,
| str_to_map(concat('t1:','a','&t2:','b'),'&',':'),
| struct('1', null) as c,
| '2024-01-08' as day
|from range(10)""".stripMargin
if (isSparkVersionLE("3.3")) {
s"""insert overwrite $table_name partition (day)
|select id as a,
| str_to_map(concat('t1:','a','&t2:','b'),'&',':'),
| struct('1', null) as c,
| '2024-01-08' as day
|from range(10)""".stripMargin
} else {
s"""insert overwrite $table_name partition (day)
|select id as a,
| map('t1', 'a', 't2', 'b'),
| struct('1', null) as c,
| '2024-01-08' as day
|from range(10)""".stripMargin
}
(table_name, create_sql, insert_sql)
},
(table_name, _) =>
if (isSparkVersionGE("3.4")) {
// FIXME: Don't Know Why Failed
compareResultsAgainstVanillaSpark(
s"select * from $table_name",
compareResult = true,
_ => {})
}
compareResultsAgainstVanillaSpark(
s"select * from $table_name",
compareResult = true,
_ => {})
)
}
}
2 changes: 0 additions & 2 deletions cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ OutputFormatFile::OutputFormatPtr ORCOutputFormatFile::createOutputFormat(const
auto res = std::make_shared<OutputFormatFile::OutputFormat>();
res->write_buffer = write_buffer_builder->build(file_uri);

std::cout << "xxx old_header:" << header.dumpStructure() << std::endl;
auto new_header = creatHeaderWithPreferredSchema(header);
std::cout << "xxx new_header:" << new_header.dumpStructure() << std::endl;
// TODO: align all spark orc config with ch orc config
auto format_settings = DB::getFormatSettings(context);
auto output_format = std::make_shared<DB::ORCBlockOutputFormat>(*(res->write_buffer), new_header, format_settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ FormatFile::InputFormatPtr ORCFormatFile::createInputFormat(const DB::Block & he

format_settings.orc.skip_stripes = std::unordered_set<int>(skip_stripe_indices.begin(), skip_stripe_indices.end());

std::cout << "orc skip stripes:" << std::endl;
for (const auto & skip_strpe: format_settings.orc.skip_stripes)
std::cout << skip_strpe << std::endl;
std::cout << "read header:" << header.dumpStructure() << std::endl;

auto input_format = std::make_shared<DB::NativeORCBlockInputFormat>(*file_format->read_buffer, header, format_settings);
file_format->input = input_format;
return file_format;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDecimalBase.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/SubstraitSource/FormatFile.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/Exception.h>
#include <Common/GlutenStringUtils.h>
#include <Common/typeid_cast.h>
#include "DataTypes/DataTypesDecimal.h"

namespace DB
{
Expand Down Expand Up @@ -352,13 +351,6 @@ bool NormalFileReader::pull(DB::Chunk & chunk)
return false;

DB::Chunk raw_chunk = input_format->input->generate();

const auto & raw_columns = raw_chunk.getColumns();
for (const auto & raw_column: raw_columns)
{
debug::headColumn(raw_column, 10);
}

const size_t rows = raw_chunk.getNumRows();
if (!rows)
return false;
Expand Down
38 changes: 38 additions & 0 deletions cpp-ch/local-engine/examples/orc_read.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include <Core/Block.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadBufferFromFile.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/NativeORCBlockInputFormat.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Common/DebugUtils.h>

using namespace DB;

int main()
{
String path = "/data1/liyang/cppproject/spark/spark-3.3.2-bin-hadoop3/t_orc/data.orc";
ReadBufferFromFile read_buffer(path);

DataTypePtr string_type = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
DataTypes elem_types = {string_type, string_type};
Strings elem_names = {"1", "2"};
DataTypePtr tuple_type = std::make_shared<DataTypeTuple>(std::move(elem_types), std::move(elem_names));
tuple_type = std::make_shared<DataTypeNullable>(std::move(tuple_type));

Block header({
{nullptr, tuple_type, "c"},
// {nullptr, std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()), "a"},
});

FormatSettings format_settings;
InputFormatPtr format = std::make_shared<NativeORCBlockInputFormat>(read_buffer, header, format_settings);
QueryPipeline pipeline(std::move(format));
PullingPipelineExecutor reader(pipeline);
Block block;
reader.pull(block);
debug::headBlock(block, 10);
return 0;
}

0 comments on commit bcc2dcb

Please sign in to comment.