Skip to content

Commit

Permalink
Merge remote-tracking branch 'spark/branch-3.5' into spark-3.5
Browse files Browse the repository at this point in the history
  • Loading branch information
ejblanco committed Jan 7, 2025
2 parents 24ed10a + 5a91172 commit fc601e2
Show file tree
Hide file tree
Showing 104 changed files with 597 additions and 241 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
tpcds=false
docker=false
fi
build=`./dev/is-changed.py -m "core,unsafe,kvstore,avro,utils,network-common,network-shuffle,repl,launcher,examples,sketch,graphx,catalyst,hive-thriftserver,streaming,sql-kafka-0-10,streaming-kafka-0-10,mllib-local,mllib,yarn,mesos,kubernetes,hadoop-cloud,spark-ganglia-lgpl,sql,hive"`
build=`./dev/is-changed.py -m "core,unsafe,kvstore,avro,utils,network-common,network-shuffle,repl,launcher,examples,sketch,graphx,catalyst,hive-thriftserver,streaming,sql-kafka-0-10,streaming-kafka-0-10,mllib-local,mllib,yarn,mesos,kubernetes,hadoop-cloud,spark-ganglia-lgpl,sql,hive,connect,protobuf,api"`
precondition="
{
\"build\": \"$build\",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
Version: 3.5.3
Version: 3.5.4
Title: R Front End for 'Apache Spark'
Description: Provides an R Front end for 'Apache Spark' <https://spark.apache.org>.
Authors@R:
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ install_app() {
local binary="${_DIR}/$6"
local remote_tarball="${mirror_host}/${url_path}${url_query}"
local local_checksum="${local_tarball}.${checksum_suffix}"
local remote_checksum="https://archive.apache.org/dist/${url_path}.${checksum_suffix}"
local remote_checksum="${mirror_host}/${url_path}.${checksum_suffix}${url_query}"

local curl_opts="--silent --show-error -L"
local wget_opts="--no-verbose"
Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion connector/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hdfs.BlockMissingException
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkException
Expand Down Expand Up @@ -140,6 +141,8 @@ private[sql] object AvroUtils extends Logging {
try {
Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
} catch {
case e: BlockMissingException =>
throw new SparkException(s"Could not read file: $path", e)
case e: IOException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $path", e)
Expand Down
2 changes: 1 addition & 1 deletion connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object IntegrationTestUtils {

// Redirect server log into console
"--conf",
s"spark.driver.extraJavaOptions=-Dlog4j.configuration=$log4j2")
s"spark.driver.extraJavaOptions=-Dlog4j.configurationFile=$log4j2")
} else Seq.empty
}

Expand Down
2 changes: 1 addition & 1 deletion connector/connect/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion connector/connect/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.UUID

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
Expand Down Expand Up @@ -237,7 +238,14 @@ private[connect] class ExecuteHolder(
// it does.
responseObserver.removeAll()
// post closed to UI
eventsManager.postClosed()
try {
eventsManager.postClosed()
} catch {
// Catching the exception to prevent the wrong error code from being returned to the
// user: SPARK-49688. The issue was fixed by completely refactoring the code in Spark 4.0.
case e: Throwable if NonFatal.apply(e) =>
logError(s"Error posting closed event to UI: ${e.getMessage()}")
}
}
// interrupt any attached grpcResponseSenders
grpcResponseSenders.foreach(_.interrupt())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class SparkConnectReattachExecuteHandler(
extends Logging {

def handle(v: proto.ReattachExecuteRequest): Unit = {
// An exception will be raised if the session is not available.
val sessionHolder =
SparkConnectService.getIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
assert(sessionHolder != null)

val executeHolder = SparkConnectService.executionManager
.getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId, v.getOperationId))
.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ object SparkConnectService extends Logging {
userSessionMapping.invalidateAll()
}

/**
* Used for testing
*/
private[connect] def invalidateSession(userId: String, sessionId: String): Unit = {
userSessionMapping.invalidate((userId, sessionId))
}

/**
* Used for testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ trait SparkConnectServerTest extends SharedSparkSession {
withSparkEnvConfs((Connect.CONNECT_GRPC_BINDING_PORT.key, serverPort.toString)) {
SparkConnectService.start(spark.sparkContext)
}
// register udf directly on the server, we're not testing client UDFs here...
val serverSession =
SparkConnectService.getOrCreateIsolatedSession(defaultUserId, defaultSessionId).session
serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms }))
}

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
}
}

test("reattach after connection expired") {
withClient { client =>
withRawBlockingStub { stub =>
// emulate session expiration
SparkConnectService.invalidateSession(defaultUserId, defaultSessionId)

// session closed, bound to fail immediately
val operationId = UUID.randomUUID().toString
val iter = stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
val e = intercept[StatusRuntimeException] {
iter.next()
}
assert(e.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND"))
}
}
}

test("raw interrupted RPC results in INVALID_CURSOR.DISCONNECTED error") {
withRawBlockingStub { stub =>
val iter = stub.executePlan(buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY)))
Expand Down Expand Up @@ -347,6 +364,10 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
}

test("long sleeping query") {
// register udf directly on the server, we're not testing client UDFs here...
val serverSession =
SparkConnectService.getOrCreateIsolatedSession(defaultUserId, defaultSessionId).session
serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms }))
// query will be sleeping and not returning results, while having multiple reattach
withSparkEnvConfs(
(Connect.CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION.key, "1s")) {
Expand Down
2 changes: 1 addition & 1 deletion connector/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import java.sql.Connection
import org.scalatest.time.SpanSugar._

import org.apache.spark.{SparkConf, SparkSQLFeatureNotSupportedException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.MsSQLServerDatabaseOnDocker
import org.apache.spark.sql.types._
Expand All @@ -39,6 +43,17 @@ import org.apache.spark.tags.DockerTest
@DockerTest
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {

def getExternalEngineQuery(executedPlan: SparkPlan): String = {
getExternalEngineRdd(executedPlan).asInstanceOf[JDBCRDD].getExternalEngineQuery
}

def getExternalEngineRdd(executedPlan: SparkPlan): RDD[InternalRow] = {
val queryNode = executedPlan.collect { case r: RowDataSourceScanExec =>
r
}.head
queryNode.rdd
}

override def excluded: Seq[String] = Seq(
"simple scan with OFFSET",
"simple scan with LIMIT and OFFSET",
Expand Down Expand Up @@ -137,4 +152,68 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD
"WHERE (dept > 1 AND ((name LIKE 'am%') = (name LIKE '%y')))")
assert(df3.collect().length == 3)
}

test("SPARK-50087: SqlServer handle booleans in CASE WHEN test") {
val df = sql(
s"""|SELECT * FROM $catalogName.employee
|WHERE CASE WHEN name = 'Legolas' THEN name = 'Elf' ELSE NOT (name = 'Wizard') END
|""".stripMargin
)

// scalastyle:off
assert(getExternalEngineQuery(df.queryExecution.executedPlan) ==
"""SELECT "dept","name","salary","bonus" FROM "employee" WHERE (CASE WHEN ("name" = 'Legolas') THEN IIF(("name" = 'Elf'), 1, 0) ELSE IIF(("name" <> 'Wizard'), 1, 0) END = 1) """
)
// scalastyle:on
df.collect()
}

test("SPARK-50087: SqlServer handle booleans in CASE WHEN with always true test") {
val df = sql(
s"""|SELECT * FROM $catalogName.employee
|WHERE CASE WHEN (name = 'Legolas') THEN (name = 'Elf') ELSE (1=1) END
|""".stripMargin
)

// scalastyle:off
assert(getExternalEngineQuery(df.queryExecution.executedPlan) ==
"""SELECT "dept","name","salary","bonus" FROM "employee" WHERE (CASE WHEN ("name" = 'Legolas') THEN IIF(("name" = 'Elf'), 1, 0) ELSE 1 END = 1) """
)
// scalastyle:on
df.collect()
}

test("SPARK-50087: SqlServer handle booleans in nested CASE WHEN test") {
val df = sql(
s"""|SELECT * FROM $catalogName.employee
|WHERE CASE WHEN (name = 'Legolas') THEN
| CASE WHEN (name = 'Elf') THEN (name = 'Elrond') ELSE (name = 'Gandalf') END
| ELSE (name = 'Sauron') END
|""".stripMargin
)

// scalastyle:off
assert(getExternalEngineQuery(df.queryExecution.executedPlan) ==
"""SELECT "dept","name","salary","bonus" FROM "employee" WHERE (CASE WHEN ("name" = 'Legolas') THEN IIF((CASE WHEN ("name" = 'Elf') THEN IIF(("name" = 'Elrond'), 1, 0) ELSE IIF(("name" = 'Gandalf'), 1, 0) END = 1), 1, 0) ELSE IIF(("name" = 'Sauron'), 1, 0) END = 1) """
)
// scalastyle:on
df.collect()
}

test("SPARK-50087: SqlServer handle non-booleans in nested CASE WHEN test") {
val df = sql(
s"""|SELECT * FROM $catalogName.employee
|WHERE CASE WHEN (name = 'Legolas') THEN
| CASE WHEN (name = 'Elf') THEN 'Elf' ELSE 'Wizard' END
| ELSE 'Sauron' END = name
|""".stripMargin
)

// scalastyle:off
assert(getExternalEngineQuery(df.queryExecution.executedPlan) ==
"""SELECT "dept","name","salary","bonus" FROM "employee" WHERE ("name" IS NOT NULL) AND ((CASE WHEN "name" = 'Legolas' THEN CASE WHEN "name" = 'Elf' THEN 'Elf' ELSE 'Wizard' END ELSE 'Sauron' END) = "name") """
)
// scalastyle:on
df.collect()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.sql.Connection
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.DatabaseOnDocker
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -123,4 +124,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
)
}
}

test("SPARK-49695: Postgres fix xor push-down") {
val df = spark.sql(s"select dept, name from $catalogName.employee where dept ^ 6 = 0")
val rows = df.collect()
assert(!df.queryExecution.sparkPlan.exists(_.isInstanceOf[FilterExec]))
assert(rows.length == 1)
assert(rows(0).getInt(0) === 6)
assert(rows(0).getString(1) === "jen")
}
}
Loading

0 comments on commit fc601e2

Please sign in to comment.