Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7028][CH][Part-15] [MINOR] Fix UTs #8364

Merged
merged 8 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,20 +340,19 @@ class ClickhouseOptimisticTransaction(

var resultFiles =
(if (optionalStatsTracker.isDefined) {
committer.addedStatuses.map {
a =>
a.copy(stats =
optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
}
} else {
committer.addedStatuses
})
committer.addedStatuses.map { a =>
a.copy(stats = optionalStatsTracker.map(
_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
}
}
else {
committer.addedStatuses
})
.filter {
// In some cases, we can write out an empty `inputData`. Some examples of this (though,
// they may be fixed in the future) are the MERGE command when you delete with empty
// source, or empty target, or on disjoint tables. This is hard to catch before
// the write without collecting the DF ahead of time. Instead,
// we can return only the AddFiles that
// In some cases, we can write out an empty `inputData`. Some examples of this (though, they
// may be fixed in the future) are the MERGE command when you delete with empty source, or
// empty target, or on disjoint tables. This is hard to catch before the write without
// collecting the DF ahead of time. Instead, we can return only the AddFiles that
// a) actually add rows, or
// b) don't have any stats so we don't know the number of rows at all
case a: AddFile => a.numLogicalRecords.forall(_ > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.clickhouse.RuntimeSettings
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.vectorized.NativeExpressionEvaluator

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -37,11 +37,11 @@ case class DeltaFileCommitInfo(committer: FileDelayedCommitProtocol)
val addedFiles: ArrayBuffer[(Map[String, String], String)] =
new ArrayBuffer[(Map[String, String], String)]
override def apply(stat: NativeFileWriteResult): Unit = {
if (stat.partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
addedFiles.append((Map.empty[String, String], stat.filename))
} else {
if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
val partitionValues = committer.parsePartitions(stat.partition_id)
addedFiles.append((partitionValues, stat.relativePath))
addedFiles.append((partitionValues, new Path(stat.relativePath).toUri.toString))
} else {
addedFiles.append((Map.empty[String, String], stat.filename))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ case class MergeTreeWriteResult(
path: Path,
modificationTime: Long,
hostName: Seq[String]): FileAction = {
val (partitionValues, part_path) = if (partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
(Map.empty[String, String], part_name)
} else {
val (partitionValues, part_path) = if (CHColumnarWrite.validatedPartitionID(partition_id)) {
(MergeTreePartitionUtils.parsePartitions(partition_id), s"$partition_id/$part_name")
} else {
(Map.empty[String, String], part_name)
}
val tags = Map[String, String](
"database" -> database,
Expand Down Expand Up @@ -88,7 +88,7 @@ case class MergeTreeWriteResult(
DeltaStatistics.NULL_COUNT -> ""
)
AddFile(
part_path,
new Path(part_path).toUri.toString,
partitionValues,
size_in_bytes,
modificationTime,
Expand Down Expand Up @@ -153,7 +153,7 @@ case class MergeTreeBasicWriteTaskStatsTracker() extends (MergeTreeWriteResult =
private var numFiles: Int = 0

def apply(stat: MergeTreeWriteResult): Unit = {
if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
partitions.append(new GenericInternalRow(Array[Any](stat.partition_id)))
}
numFiles += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ case class HadoopMapReduceAdapter(sparkCommitter: HadoopMapReduceCommitProtocol)
}

case class NativeFileWriteResult(filename: String, partition_id: String, record_count: Long) {
lazy val relativePath: String = if (partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
filename
} else {
lazy val relativePath: String = if (CHColumnarWrite.validatedPartitionID(partition_id)) {
s"$partition_id/$filename"
} else {
filename
}
}

Expand Down Expand Up @@ -212,7 +212,7 @@ case class NativeBasicWriteTaskStatsTracker(
private var numWrittenRows: Long = 0
override def apply(stat: NativeFileWriteResult): Unit = {
val absolutePath = s"$writeDir/${stat.relativePath}"
if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
basicWriteJobStatsTracker.newPartition(new GenericInternalRow(Array[Any](stat.partition_id)))
}
basicWriteJobStatsTracker.newFile(absolutePath)
Expand All @@ -233,7 +233,7 @@ case class FileCommitInfo(description: WriteJobDescription)

def apply(stat: NativeFileWriteResult): Unit = {
val tmpAbsolutePath = s"${description.path}/${stat.relativePath}"
if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
partitions += stat.partition_id
val customOutputPath =
description.customPartitionLocations.get(
Expand Down Expand Up @@ -325,5 +325,7 @@ object CHColumnarWrite {
case other => CHDeltaColumnarWrite(jobTrackerID, description, other)
}

val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__"
private val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__"

def validatedPartitionID(partitionID: String): Boolean = partitionID != EMPTY_PARTITION_ID
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
spark.sql("drop table lineitem_mergetree_hdfs purge")
}

testSparkVersionLE33("test cache mergetree data no partition columns") {
test("test cache mergetree data no partition columns") {

spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
}
}

testSparkVersionLE33("test mergetree path based table update") {
test("test mergetree path based table update") {
val dataPath = s"$basePath/lineitem_mergetree_update"
clearDataPath(dataPath)

Expand All @@ -315,78 +315,87 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.mode(SaveMode.Append)
.save(dataPath)

spark.sql(s"""
| update clickhouse.`$dataPath` set l_returnflag = 'Z' where l_orderkey = 12647
|""".stripMargin)

{
val df = spark.read
.format("clickhouse")
.load(dataPath)
.where("l_returnflag = 'Z'")
assertResult(1)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)
/**
* TODO: new test for (spark.databricks.delta.stats.skipping -> true)
*
* Since one pipeline write will collect stats, so that pruning will be more accurate in point
* query. Let's add a new test when we implement lightweight update and delete.
*/
withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
spark.sql(s"""
| update clickhouse.`$dataPath` set l_returnflag = 'Z' where l_orderkey = 12647
|""".stripMargin)

{
val df = spark.read
.format("clickhouse")
.load(dataPath)
.where("l_returnflag = 'Z'")
assertResult(1)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKey === StorageMeta.DEFAULT_ORDER_BY_KEY)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKey.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)
}
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKey === StorageMeta.DEFAULT_ORDER_BY_KEY)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKey.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths =
addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)
}

val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" -> "'X'"))
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" -> "'X'"))

{
val df = spark.read
.format("clickhouse")
.load(dataPath)
.where("l_returnflag = 'X'")
assertResult(1)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)
{
val df = spark.read
.format("clickhouse")
.load(dataPath)
.where("l_returnflag = 'X'")
assertResult(1)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)

// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths =
addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
}
}

val df = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(600572)(df.count())
}

testSparkVersionLE33("test mergetree path based table delete") {
test("test mergetree path based table delete") {
val dataPath = s"$basePath/lineitem_mergetree_delete"
clearDataPath(dataPath)

Expand All @@ -399,32 +408,40 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.mode(SaveMode.Append)
.save(dataPath)

spark.sql(s"""
| delete from clickhouse.`$dataPath` where l_orderkey = 12647
|""".stripMargin)
val df = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(600571)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
/**
* TODO: new test for (spark.databricks.delta.stats.skipping -> true)
*
* Since one pipeline write will collect stats, so that pruning will be more accurate in point
* query. Let's add a new test when we implement lightweight update and delete.
*/
withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
spark.sql(s"""
| delete from clickhouse.`$dataPath` where l_orderkey = 12647
|""".stripMargin)
val df = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(600571)(df.count())
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
val mergetreeScan = scanExec.head
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)

val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.delete("mod(l_orderkey, 3) = 2")
val df1 = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(400089)(df1.count())
}
val mergetreeScan = scanExec.head
val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
// 4 parts belong to the first batch
// 2 parts belong to the second batch (1 actual updated part, 1 passively updated).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
assertResult(Array(2, 4))(filePaths.values.map(paths => paths.size).toArray.sorted)

val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.delete("mod(l_orderkey, 3) = 2")
val df1 = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(400089)(df1.count())
}

test("test mergetree path based table upsert") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs")
}

testSparkVersionLE33("test mergetree write with the path based") {
testSparkVersionLE33("test mergetree write with the path based bucket table") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"

val sourceDF = spark.sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs purge")
}

testSparkVersionLE33("test mergetree write with the path based") {
testSparkVersionLE33("test mergetree write with the path based bucket table") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"

val sourceDF = spark.sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
spark.sql("drop table lineitem_mergetree_bucket_s3")
}

testSparkVersionLE33("test mergetree write with the path based") {
testSparkVersionLE33("test mergetree write with the path based bucket table") {
val dataPath = s"s3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3"

val sourceDF = spark.sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ class GlutenClickHouseMergeTreeWriteSuite
runTPCHQueryBySQL(6, q6("lineitem_mergetree_case_sensitive")) { _ => }
}

testSparkVersionLE33("test mergetree with partition with whitespace") {
test("test mergetree with partition with whitespace") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_partition_with_whitespace;
|""".stripMargin)
Expand Down
Loading
Loading