Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-15] [MINOR] Fix UTs (#8364)
Browse files Browse the repository at this point in the history
* [Fix UT] spark.databricks.delta.stats.skipping -> false

* [Fix UT] Bucket table not support

* [Fix UT] 'test cache mergetree data no partition columns' already fixed by (#8346)

* [UT] open ignore test

* [MINOR REFACTOR] Pass by const reference instead of pass by value

* [MINOR REFACTOR] validatedPartitionID

* [Fix Bug] decode part name

* clang 19 fix
  • Loading branch information
baibaichen authored Dec 30, 2024
1 parent 49f6657 commit eaf6548
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,20 +340,19 @@ class ClickhouseOptimisticTransaction(

var resultFiles =
(if (optionalStatsTracker.isDefined) {
committer.addedStatuses.map {
a =>
a.copy(stats =
optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
}
} else {
committer.addedStatuses
})
committer.addedStatuses.map { a =>
a.copy(stats = optionalStatsTracker.map(
_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
}
}
else {
committer.addedStatuses
})
.filter {
// In some cases, we can write out an empty `inputData`. Some examples of this (though,
// they may be fixed in the future) are the MERGE command when you delete with empty
// source, or empty target, or on disjoint tables. This is hard to catch before
// the write without collecting the DF ahead of time. Instead,
// we can return only the AddFiles that
// In some cases, we can write out an empty `inputData`. Some examples of this (though, they
// may be fixed in the future) are the MERGE command when you delete with empty source, or
// empty target, or on disjoint tables. This is hard to catch before the write without
// collecting the DF ahead of time. Instead, we can return only the AddFiles that
// a) actually add rows, or
// b) don't have any stats so we don't know the number of rows at all
case a: AddFile => a.numLogicalRecords.forall(_ > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.clickhouse.RuntimeSettings
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.vectorized.NativeExpressionEvaluator

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -37,11 +37,11 @@ case class DeltaFileCommitInfo(committer: FileDelayedCommitProtocol)
val addedFiles: ArrayBuffer[(Map[String, String], String)] =
new ArrayBuffer[(Map[String, String], String)]
override def apply(stat: NativeFileWriteResult): Unit = {
if (stat.partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
addedFiles.append((Map.empty[String, String], stat.filename))
} else {
if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
val partitionValues = committer.parsePartitions(stat.partition_id)
addedFiles.append((partitionValues, stat.relativePath))
addedFiles.append((partitionValues, new Path(stat.relativePath).toUri.toString))
} else {
addedFiles.append((Map.empty[String, String], stat.filename))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ case class MergeTreeWriteResult(
path: Path,
modificationTime: Long,
hostName: Seq[String]): FileAction = {
val (partitionValues, part_path) = if (partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
(Map.empty[String, String], part_name)
} else {
val (partitionValues, part_path) = if (CHColumnarWrite.validatedPartitionID(partition_id)) {
(MergeTreePartitionUtils.parsePartitions(partition_id), s"$partition_id/$part_name")
} else {
(Map.empty[String, String], part_name)
}
val tags = Map[String, String](
"database" -> database,
Expand Down Expand Up @@ -88,7 +88,7 @@ case class MergeTreeWriteResult(
DeltaStatistics.NULL_COUNT -> ""
)
AddFile(
part_path,
new Path(part_path).toUri.toString,
partitionValues,
size_in_bytes,
modificationTime,
Expand Down Expand Up @@ -153,7 +153,7 @@ case class MergeTreeBasicWriteTaskStatsTracker() extends (MergeTreeWriteResult =
private var numFiles: Int = 0

def apply(stat: MergeTreeWriteResult): Unit = {
if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
partitions.append(new GenericInternalRow(Array[Any](stat.partition_id)))
}
numFiles += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ case class HadoopMapReduceAdapter(sparkCommitter: HadoopMapReduceCommitProtocol)
}

case class NativeFileWriteResult(filename: String, partition_id: String, record_count: Long) {
lazy val relativePath: String = if (partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
filename
} else {
lazy val relativePath: String = if (CHColumnarWrite.validatedPartitionID(partition_id)) {
s"$partition_id/$filename"
} else {
filename
}
}

Expand Down Expand Up @@ -212,7 +212,7 @@ case class NativeBasicWriteTaskStatsTracker(
private var numWrittenRows: Long = 0
override def apply(stat: NativeFileWriteResult): Unit = {
val absolutePath = s"$writeDir/${stat.relativePath}"
if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
basicWriteJobStatsTracker.newPartition(new GenericInternalRow(Array[Any](stat.partition_id)))
}
basicWriteJobStatsTracker.newFile(absolutePath)
Expand All @@ -233,7 +233,7 @@ case class FileCommitInfo(description: WriteJobDescription)

def apply(stat: NativeFileWriteResult): Unit = {
val tmpAbsolutePath = s"${description.path}/${stat.relativePath}"
if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
partitions += stat.partition_id
val customOutputPath =
description.customPartitionLocations.get(
Expand Down Expand Up @@ -325,5 +325,7 @@ object CHColumnarWrite {
case other => CHDeltaColumnarWrite(jobTrackerID, description, other)
}

val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__"
private val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__"

def validatedPartitionID(partitionID: String): Boolean = partitionID != EMPTY_PARTITION_ID
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
spark.sql("drop table lineitem_mergetree_hdfs purge")
}

testSparkVersionLE33("test cache mergetree data no partition columns") {
test("test cache mergetree data no partition columns") {

spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
}
}

testSparkVersionLE33("test mergetree path based table update") {
test("test mergetree path based table update") {
val dataPath = s"$basePath/lineitem_mergetree_update"
clearDataPath(dataPath)

Expand All @@ -315,78 +315,87 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.mode(SaveMode.Append)
.save(dataPath)

spark.sql(s"""
| update clickhouse.`$dataPath` set l_returnflag = 'Z' where l_orderkey = 12647
|""".stripMargin)

{
val df = spark.read
.format("clickhouse")
.load(dataPath)
.where("l_returnflag = 'Z'")
assertResult(1)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)
/**
* TODO: new test for (spark.databricks.delta.stats.skipping -> true)
*
* Since one pipeline write will collect stats, so that pruning will be more accurate in point
* query. Let's add a new test when we implement lightweight update and delete.
*/
withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
spark.sql(s"""
| update clickhouse.`$dataPath` set l_returnflag = 'Z' where l_orderkey = 12647
|""".stripMargin)

{
val df = spark.read
.format("clickhouse")
.load(dataPath)
.where("l_returnflag = 'Z'")
assertResult(1)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKey === StorageMeta.DEFAULT_ORDER_BY_KEY)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKey.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)
}
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKey === StorageMeta.DEFAULT_ORDER_BY_KEY)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKey.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths =
addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)
}

val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" -> "'X'"))
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" -> "'X'"))

{
val df = spark.read
.format("clickhouse")
.load(dataPath)
.where("l_returnflag = 'X'")
assertResult(1)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)
{
val df = spark.read
.format("clickhouse")
.load(dataPath)
.where("l_returnflag = 'X'")
assertResult(1)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)

// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths =
addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
}
}

val df = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(600572)(df.count())
}

testSparkVersionLE33("test mergetree path based table delete") {
test("test mergetree path based table delete") {
val dataPath = s"$basePath/lineitem_mergetree_delete"
clearDataPath(dataPath)

Expand All @@ -399,32 +408,40 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.mode(SaveMode.Append)
.save(dataPath)

spark.sql(s"""
| delete from clickhouse.`$dataPath` where l_orderkey = 12647
|""".stripMargin)
val df = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(600571)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
/**
* TODO: new test for (spark.databricks.delta.stats.skipping -> true)
*
* Since one pipeline write will collect stats, so that pruning will be more accurate in point
* query. Let's add a new test when we implement lightweight update and delete.
*/
withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
spark.sql(s"""
| delete from clickhouse.`$dataPath` where l_orderkey = 12647
|""".stripMargin)
val df = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(600571)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
val mergetreeScan = scanExec.head
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)

val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.delete("mod(l_orderkey, 3) = 2")
val df1 = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(400089)(df1.count())
}
val mergetreeScan = scanExec.head
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)

val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.delete("mod(l_orderkey, 3) = 2")
val df1 = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(400089)(df1.count())
}

test("test mergetree path based table upsert") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs")
}

testSparkVersionLE33("test mergetree write with the path based") {
testSparkVersionLE33("test mergetree write with the path based bucket table") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"

val sourceDF = spark.sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs purge")
}

testSparkVersionLE33("test mergetree write with the path based") {
testSparkVersionLE33("test mergetree write with the path based bucket table") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"

val sourceDF = spark.sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
spark.sql("drop table lineitem_mergetree_bucket_s3")
}

testSparkVersionLE33("test mergetree write with the path based") {
testSparkVersionLE33("test mergetree write with the path based bucket table") {
val dataPath = s"s3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3"

val sourceDF = spark.sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ class GlutenClickHouseMergeTreeWriteSuite
runTPCHQueryBySQL(6, q6("lineitem_mergetree_case_sensitive")) { _ => }
}

testSparkVersionLE33("test mergetree with partition with whitespace") {
test("test mergetree with partition with whitespace") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_partition_with_whitespace;
|""".stripMargin)
Expand Down
Loading

0 comments on commit eaf6548

Please sign in to comment.