Skip to content

Commit

Permalink
[SPARK-49297][CORE][TESTS] Fix race condition in BlockManagerDecommis…
Browse files Browse the repository at this point in the history
…sionIntegrationSuite

### What changes were proposed in this pull request?

Fixing race condition in the test "SPARK-46957: Migrated shuffle files should be able to cleanup from executor" of `BlockManagerDecommissionIntegrationSuite`.

There are at least two race conditions in this test:

1) The `SparkListener` is running in a different thread from the main thread of the unit test so `shuffleBlockUpdates` must be accessed from synchronised block.  For this `ConcurrentLinkedQueue` is used similarly to `taskEndEvents` in the `runDecomTest()` method. There was not reported error for this yet.

2) The `SparkListener` is informed earlier about a removed executor than the block manager `stop()` is called/finished. So latter when the test recursively iterating over the files to find the shuffle files the block manager's `stop()` might be running and deleting the underlying files recursively via the disk block manager. This leads to `java.nio.file.NoSuchFileException`, like:

```
- SPARK-46957: Migrated shuffle files should be able to cleanup from executor *** FAILED ***
18848  java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /home/runner/work/spark/spark/core/target/tmp/spark-87f59bc6-b996-42cd-9775-2f704b67f773/executor-e0a030d4-434b-46b3-bdbf-81d4908bb0f5/blockmgr-d88ed5dd-c1c8-4713-9433-10694e736a8e/3a
18849  at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
18850  at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
18851  at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
18852  at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
18853  at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
18854  at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
18855  at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
18856  at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
18857  at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
18858  at org.apache.commons.io.FileUtils.toList(FileUtils.java:3025)
18859  ...
18860  Cause: java.nio.file.NoSuchFileException: /home/runner/work/spark/spark/core/target/tmp/spark-87f59bc6-b996-42cd-9775-2f704b67f773/executor-e0a030d4-434b-46b3-bdbf-81d4908bb0f5/blockmgr-d88ed5dd-c1c8-4713-9433-10694e736a8e/3a
18861  at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
18862  at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
18863  at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
18864  at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
18865  at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:171)
18866  at java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
18867  at java.base/java.nio.file.Files.readAttributes(Files.java:1853)
18868  at java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:226)
18869  at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:277)
18870  at java.base/java.nio.file.FileTreeWalker.next(FileTreeWalker.java:374)
18871  ...
```

To address this issue the local directory used by the decommissioned executor was find out and the wait is changed from getting the `ExecutorRemoved` event  to  wait for  the delete of this directory to be finished. This way the directory tree walked after the delete.

Disclaimer:

The jira also mentions failures like:
```
[info] - SPARK-46957: Migrated shuffle files should be able to cleanup from executor *** FAILED *** (35 seconds, 200 milliseconds)
15718[info]   0 was not greater than or equal to 4 (BlockManagerDecommissionIntegrationSuite.scala:423)
15719[info]   org.scalatest.exceptions.TestFailedException:
15720[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
15721[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
15722[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
15723[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
15724[info]   at org.apache.spark.storage.BlockManagerDecommissionIntegrationSuite.$anonfun$new$10(BlockManagerDecommissionIntegrationSuite.scala:423)
```

In my reproduction only the failure with `java.nio.file.NoSuchFileException` was  coming so it might be there are still something to fix even after this changes.

### Why are the changes needed?

As Maven daily test with Java 21 was failing from time to time (flaky because of the race condition).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It was tested by running the existing unit test repeatedly.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48683 from attilapiros/SPARK-49297-3.

Lead-authored-by: attilapiros <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
attilapiros and dongjoon-hyun committed Oct 29, 2024
1 parent d371ec0 commit 87f5a86
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import java.io.File
import java.nio.file.{Files, Paths}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore, TimeUnit}

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -377,20 +378,22 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val shuffleBlockUpdates = new ArrayBuffer[BlockId]()
var isDecommissionedExecutorRemoved = false
val shuffleBlockUpdates = new ConcurrentLinkedQueue[BlockId]()
val execToDecommission = sc.getExecutorIds().head
val decommissionedExecutorLocalDir = sc.parallelize(1 to 100, 10).flatMap { _ =>
if (SparkEnv.get.executorId == execToDecommission) {
SparkEnv.get.blockManager.getLocalDiskDirs
} else {
Array.empty[String]
}
}.collect().toSet
assert(decommissionedExecutorLocalDir.size == 1)
sc.addSparkListener(new SparkListener {
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) {
shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId
shuffleBlockUpdates.add(blockUpdated.blockUpdatedInfo.blockId)
}
}

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
assert(execToDecommission === executorRemoved.executorId)
isDecommissionedExecutorRemoved = true
}
})

// Run a job to create shuffle data
Expand All @@ -409,12 +412,13 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
)

eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(isDecommissionedExecutorRemoved)
assert(Files.notExists(Paths.get(decommissionedExecutorLocalDir.head)))
// Ensure there are shuffle data have been migrated
assert(shuffleBlockUpdates.size >= 2)
}

val shuffleId = shuffleBlockUpdates
.asScala
.find(_.isInstanceOf[ShuffleIndexBlockId])
.map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId)
.get
Expand Down

0 comments on commit 87f5a86

Please sign in to comment.