Skip to content

Commit

Permalink
[SPARK-31389][SQL][TESTS] Add codegen-on test coverage for some tests…
Browse files Browse the repository at this point in the history
… in SQLMetricsSuite

### What changes were proposed in this pull request?
Adding missing unit tests in SQLMetricSuite to cover the code generated path.
**Additional tests were added in the following unit tests.**
Filter metrics, SortMergeJoin metrics, SortMergeJoin(outer) metrics, BroadcastHashJoin metrics,  ShuffledHashJoin metrics, BroadcastHashJoin(outer) metrics, BroadcastNestedLoopJoin metrics, BroadcastLeftSemiJoinHash metrics, CartesianProduct metrics,  SortMergeJoin(left-anti) metrics

### Why are the changes needed?
The existing tests in SQLMetricSuite only cover the interpreted path.
It is necessary for the tests to cover code generated path as well since CodeGenerated path is often used in production.

The PR doesn't change test("Aggregate metrics") and test("ObjectHashAggregate metrics"). The test("Aggregate metrics") tests metrics when a HashAggregate is used. Enabling codegen forces the test to use ObjectHashAggregate rather than the regular HashAggregate. ObjectHashAggregate has a test of its own. Therefore, I feel these two tests need not enabling codegen is not necessary.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
I added debug statements in the code to make sure both Code generated and Interpreted paths are being exercised.
I further used Intellij debugger to ensure that the newly added unit tests are in fact exercising both code generated and interpreted paths.

Closes apache#28173 from sririshindra/SPARK-31389.

Authored-by: rishi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
rishi authored and dongjoon-hyun committed Apr 20, 2020
1 parent 69f9ee1 commit 4f8b03d
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,14 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
test("Filter metrics") {
// Assume the execution plan is
// PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
val df = person.filter('age < 25)
testSparkPlanMetrics(df, 1, Map(
0L -> (("Filter", Map(
"number of output rows" -> 1L))))
)
Seq((0L, false), (1L, true)).foreach { case (nodeId, enableWholeStage) =>
val df = person.filter('age < 25)
testSparkPlanMetrics(df, 1, Map(
nodeId -> (("Filter", Map(
"number of output rows" -> 1L)))),
enableWholeStage
)
}
}

test("WholeStageCodegen metrics") {
Expand Down Expand Up @@ -246,18 +249,21 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
withTempView("testDataForJoin") {
// Assume the execution plan is
// ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
val df = spark.sql(
"SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
"number of output rows" -> 4L))),
2L -> (("Exchange", Map(
"records read" -> 4L,
"local blocks read" -> 2L,
"remote blocks read" -> 0L,
"shuffle records written" -> 2L))))
)
val query = "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a"
Seq((0L, 2L, false), (1L, 4L, true)).foreach { case (nodeId1, nodeId2, enableWholeStage) =>
val df = spark.sql(query)
testSparkPlanMetrics(df, 1, Map(
nodeId1 -> (("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
"number of output rows" -> 4L))),
nodeId2 -> (("Exchange", Map(
"records read" -> 4L,
"local blocks read" -> 2L,
"remote blocks read" -> 0L,
"shuffle records written" -> 2L)))),
enableWholeStage
)
}
}
}

Expand All @@ -269,21 +275,21 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
withTempView("testDataForJoin") {
// Assume the execution plan is
// ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
val df = spark.sql(
"SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 8 because we read 6 rows in the left and 2 row in the right one
"number of output rows" -> 8L))))
)

val df2 = spark.sql(
"SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df2, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 8 because we read 6 rows in the left and 2 row in the right one
"number of output rows" -> 8L))))
)
val leftJoinQuery = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
"testData2.a = testDataForJoin.a"
val rightJoinQuery = "SELECT * FROM testDataForJoin right JOIN testData2 ON " +
"testData2.a = testDataForJoin.a"

Seq((leftJoinQuery, false), (leftJoinQuery, true), (rightJoinQuery, false),
(rightJoinQuery, true)).foreach { case (query, enableWholeStage) =>
val df = spark.sql(query)
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 8 because we read 6 rows in the left and 2 row in the right one
"number of output rows" -> 8L)))),
enableWholeStage
)
}
}
}

Expand All @@ -292,11 +298,14 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", "value")
// Assume the execution plan is
// ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
val df = df1.join(broadcast(df2), "key")
testSparkPlanMetrics(df, 2, Map(
1L -> (("BroadcastHashJoin", Map(
"number of output rows" -> 2L))))
)
Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) =>
val df = df1.join(broadcast(df2), "key")
testSparkPlanMetrics(df, 2, Map(
nodeId -> (("BroadcastHashJoin", Map(
"number of output rows" -> 2L)))),
enableWholeStage
)
}
}

test("ShuffledHashJoin metrics") {
Expand All @@ -314,17 +323,21 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
// +- Exchange(nodeId = 5)
// +- Project(nodeId = 6)
// +- LocalTableScan(nodeId = 7)
val df = df1.join(df2, "key")
testSparkPlanMetrics(df, 1, Map(
1L -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L))),
2L -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
5L -> (("Exchange", Map(
"shuffle records written" -> 10L,
"records read" -> 10L))))
)
Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
val df = df1.join(df2, "key")
testSparkPlanMetrics(df, 1, Map(
nodeId1 -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L))),
nodeId2 -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
nodeId3 -> (("Exchange", Map(
"shuffle records written" -> 10L,
"records read" -> 10L)))),
enableWholeStage
)
}
}
}

Expand All @@ -333,17 +346,16 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
// Assume the execution plan is
// ... -> BroadcastHashJoin(nodeId = 0)
val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
testSparkPlanMetrics(df, 2, Map(
0L -> (("BroadcastHashJoin", Map(
"number of output rows" -> 5L))))
)

val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
testSparkPlanMetrics(df3, 2, Map(
0L -> (("BroadcastHashJoin", Map(
"number of output rows" -> 6L))))
)
Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false),
("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach {
case (joinType, nodeId, numRows, enableWholeStage) =>
val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
testSparkPlanMetrics(df, 2, Map(
nodeId -> (("BroadcastHashJoin", Map(
"number of output rows" -> numRows)))),
enableWholeStage
)
}
}

test("BroadcastNestedLoopJoin metrics") {
Expand All @@ -353,13 +365,16 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
withTempView("testDataForJoin") {
// Assume the execution plan is
// ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
val df = spark.sql(
"SELECT * FROM testData2 left JOIN testDataForJoin ON " +
"testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
testSparkPlanMetrics(df, 3, Map(
1L -> (("BroadcastNestedLoopJoin", Map(
"number of output rows" -> 12L))))
)
val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
"testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
Seq(false, true).foreach { enableWholeStage =>
val df = spark.sql(query)
testSparkPlanMetrics(df, 2, Map(
0L -> (("BroadcastNestedLoopJoin", Map(
"number of output rows" -> 12L)))),
enableWholeStage
)
}
}
}
}
Expand All @@ -369,11 +384,14 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
// Assume the execution plan is
// ... -> BroadcastHashJoin(nodeId = 1)
val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
testSparkPlanMetrics(df, 2, Map(
1L -> (("BroadcastHashJoin", Map(
"number of output rows" -> 2L))))
)
Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) =>
val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
testSparkPlanMetrics(df, 2, Map(
nodeId -> (("BroadcastHashJoin", Map(
"number of output rows" -> 2L)))),
enableWholeStage
)
}
}

test("CartesianProduct metrics") {
Expand All @@ -383,11 +401,14 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
withTempView("testDataForJoin") {
// Assume the execution plan is
// ... -> CartesianProduct(nodeId = 1) -> TungstenProject(nodeId = 0)
val df = spark.sql(
"SELECT * FROM testData2 JOIN testDataForJoin")
testSparkPlanMetrics(df, 1, Map(
0L -> (("CartesianProduct", Map("number of output rows" -> 12L))))
)
val query = "SELECT * FROM testData2 JOIN testDataForJoin"
Seq(true, false).foreach { enableWholeStage =>
val df = spark.sql(query)
testSparkPlanMetrics(df, 1, Map(
0L -> (("CartesianProduct", Map("number of output rows" -> 12L)))),
enableWholeStage
)
}
}
}
}
Expand All @@ -396,11 +417,14 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val anti = testData2.filter("a > 2")
withTempView("antiData") {
anti.createOrReplaceTempView("antiData")
val df = spark.sql(
"SELECT * FROM testData2 ANTI JOIN antiData ON testData2.a = antiData.a")
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map("number of output rows" -> 4L))))
)
val query = "SELECT * FROM testData2 ANTI JOIN antiData ON testData2.a = antiData.a"
Seq(false, true).foreach { enableWholeStage =>
val df = spark.sql(query)
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map("number of output rows" -> 4L)))),
enableWholeStage
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,16 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
protected def testSparkPlanMetrics(
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
expectedMetrics: Map[Long, (String, Map[String, Any])],
enableWholeStage: Boolean = false): Unit = {
val expectedMetricsPredicates = expectedMetrics.mapValues { case (nodeName, nodeMetrics) =>
(nodeName, nodeMetrics.mapValues(expectedMetricValue =>
(actualMetricValue: Any) => {
actualMetricValue.toString.matches(expectedMetricValue.toString)
}))
}
testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates)
testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates,
enableWholeStage)
}

/**
Expand Down

0 comments on commit 4f8b03d

Please sign in to comment.