Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 20, 2024
1 parent b0cd32f commit b4e10cd
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

package org.apache.spark.sql.comet.execution.shuffle

import java.io.{InputStream, OutputStream}

import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{IO_COMPRESSION_CODEC, SHUFFLE_COMPRESS}
import org.apache.spark.io.CompressionCodec
import org.apache.comet.CometConf
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream

import java.io.{InputStream, OutputStream}
import org.apache.comet.CometConf

private[spark] object ShuffleUtils extends Logging {
// optional compression codec to use when compressing shuffle files
Expand Down Expand Up @@ -58,6 +59,7 @@ private[spark] object ShuffleUtils extends Logging {
object Lz4FrameCodec extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
// we only support decompressing lz4 in jvm
throw new UnsupportedOperationException()
}

Expand Down
3 changes: 2 additions & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd are supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
Expand Down
3 changes: 3 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,9 @@ impl PhysicalPlanner {
Ok(SparkCompressionCodec::Zstd) => {
Ok(CompressionCodec::Zstd(writer.compression_level))
}
Ok(SparkCompressionCodec::Lz4) => {
Ok(CompressionCodec::Lz4Frame(writer.compression_level))
}
_ => Err(ExecutionError::GeneralError(format!(
"Unsupported shuffle compression codec: {:?}",
writer.codec
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ message Limit {
enum CompressionCodec {
None = 0;
Zstd = 1;
Lz4 = 2;
}

message ShuffleWriter {
Expand Down

0 comments on commit b4e10cd

Please sign in to comment.