Skip to content

Commit

Permalink
[SPARK-32517][CORE] Add StorageLevel.DISK_ONLY_3
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to add `StorageLevel.DISK_ONLY_3` as a built-in `StorageLevel`.

### Why are the changes needed?

In a YARN cluster, HDFS uaually provides storages with replication factor 3. So, we can save the result to HDFS to get `StorageLevel.DISK_ONLY_3` technically. However, disaggregate clusters or clusters without storage services are rising. Previously, in that situation, the users were able to use similar `MEMORY_AND_DISK_2` or a user-created `StorageLevel`. This PR aims to support those use cases officially for better UX.

### Does this PR introduce _any_ user-facing change?

Yes. This provides a new built-in option.

### How was this patch tested?

Pass the GitHub Action or Jenkins with the revised test cases.

Closes apache#29331 from dongjoon-hyun/SPARK-32517.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Aug 10, 2020
1 parent f80a480 commit b421bf0
Show file tree
Hide file tree
Showing 10 changed files with 17 additions and 4 deletions.
4 changes: 4 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ varargsToStrEnv <- function(...) {

getStorageLevel <- function(newLevel = c("DISK_ONLY",
"DISK_ONLY_2",
"DISK_ONLY_3",
"MEMORY_AND_DISK",
"MEMORY_AND_DISK_2",
"MEMORY_AND_DISK_SER",
Expand All @@ -390,6 +391,7 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
storageLevel <- switch(newLevel,
"DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
"DISK_ONLY_3" = callJStatic(storageLevelClass, "DISK_ONLY_3"),
"MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
Expand All @@ -415,6 +417,8 @@ storageLevelToString <- function(levelObj) {
"DISK_ONLY"
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
"DISK_ONLY_2"
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 3) {
"DISK_ONLY_3"
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
"MEMORY_ONLY"
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
public static final StorageLevel DISK_ONLY_3 = create(true, false, false, false, 3);
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
Expand All @@ -172,6 +173,7 @@ object StorageLevel {
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "DISK_ONLY_3" => DISK_ONLY_3
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler

val clusterUrl = "local-cluster[2,1,1024]"
val clusterUrl = "local-cluster[3,1,1024]"

test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
Expand Down Expand Up @@ -174,7 +174,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex

private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = {
sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test"))
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
TestUtils.waitUntilExecutorsUp(sc, 3, 60000)
val data = sc.parallelize(1 to 1000, 10)
val cachedData = data.persist(storageLevel)
assert(cachedData.count === 1000)
Expand Down Expand Up @@ -206,7 +206,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
"caching on disk" -> StorageLevel.DISK_ONLY,
"caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2,
"caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2,
"caching on disk, replicated" -> StorageLevel.DISK_ONLY_2,
"caching on disk, replicated 2" -> StorageLevel.DISK_ONLY_2,
"caching on disk, replicated 3" -> StorageLevel.DISK_ONLY_3,
"caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2,
"caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2
).foreach { case (testName, storageLevel) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
assert(transform(StorageLevel.MEMORY_ONLY_SER_2) === StorageLevel.MEMORY_AND_DISK_SER_2)
assert(transform(StorageLevel.DISK_ONLY) === StorageLevel.DISK_ONLY)
assert(transform(StorageLevel.DISK_ONLY_2) === StorageLevel.DISK_ONLY_2)
assert(transform(StorageLevel.DISK_ONLY_3) === StorageLevel.DISK_ONLY_3)
assert(transform(StorageLevel.MEMORY_AND_DISK) === StorageLevel.MEMORY_AND_DISK)
assert(transform(StorageLevel.MEMORY_AND_DISK_SER) === StorageLevel.MEMORY_AND_DISK_SER)
assert(transform(StorageLevel.MEMORY_AND_DISK_2) === StorageLevel.MEMORY_AND_DISK_2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testStorageLevel(StorageLevel.NONE)
testStorageLevel(StorageLevel.DISK_ONLY)
testStorageLevel(StorageLevel.DISK_ONLY_2)
testStorageLevel(StorageLevel.DISK_ONLY_3)
testStorageLevel(StorageLevel.MEMORY_ONLY)
testStorageLevel(StorageLevel.MEMORY_ONLY_2)
testStorageLevel(StorageLevel.MEMORY_ONLY_SER)
Expand Down
2 changes: 1 addition & 1 deletion docs/rdd-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ storage levels is:

**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.*
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2`, and `DISK_ONLY_3`.*

Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.

Expand Down
1 change: 1 addition & 0 deletions docs/sql-ref-syntax-aux-cache-cache-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ CACHE [ LAZY ] TABLE table_identifier
* `NONE`
* `DISK_ONLY`
* `DISK_ONLY_2`
* `DISK_ONLY_3`
* `MEMORY_ONLY`
* `MEMORY_ONLY_2`
* `MEMORY_ONLY_SER`
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ Management
SparkFiles.getRootDirectory
StorageLevel.DISK_ONLY
StorageLevel.DISK_ONLY_2
StorageLevel.DISK_ONLY_3
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_2
StorageLevel.MEMORY_ONLY
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/storagelevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __str__(self):

StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
Expand Down

0 comments on commit b421bf0

Please sign in to comment.