Skip to content

Commit

Permalink
feat: Move shuffle block decompression and decoding to native code an…
Browse files Browse the repository at this point in the history
…d add LZ4 & Snappy support (#1192)

* Implement native decoding and decompression

* revert some variable renaming for smaller diff

* fix oom issues?

* make NativeBatchDecoderIterator more consistent with ArrowReaderIterator

* fix oom and prep for review

* format

* Add LZ4 support

* clippy, new benchmark

* rename metrics, clean up lz4 code

* update test

* Add support for snappy

* format

* change default back to lz4

* make metrics more accurate

* format

* clippy

* use faster unsafe version of lz4_flex

* Make compression codec configurable for columnar shuffle

* clippy

* fix bench

* fmt

* address feedback

* address feedback

* address feedback

* minor code simplification

* cargo fmt

* overflow check

* rename compression level config

* address feedback

* address feedback

* rename constant
  • Loading branch information
andygrove authored Jan 7, 2025
1 parent e72beb1 commit 74a6a8d
Show file tree
Hide file tree
Showing 23 changed files with 524 additions and 231 deletions.
23 changes: 12 additions & 11 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,19 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " +
"Compression can be disabled by setting spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd"))
.createWithDefault("zstd")
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and " +
"snappy are supported. Compression can be disabled by setting " +
"spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd", "lz4", "snappy"))
.createWithDefault("lz4")

val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
.doc("The compression level to use when compression shuffle files.")
val COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.zstd.level")
.doc("The compression level to use when compressing shuffle files with zstd.")
.intConf
.createWithDefault(1)

Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy_task_shared'. | greedy_task_shared |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
| spark.comet.exec.shuffle.compression.zstd.level | The compression level to use when compressing shuffle files with zstd. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
Expand Down
48 changes: 25 additions & 23 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ serde = { version = "1", features = ["derive"] }
lazy_static = "1.4.0"
prost = "0.12.1"
jni = "0.21"
snap = "1.1"
# we disable default features in lz4_flex to force the use of the faster unsafe encoding and decoding implementation
lz4_flex = { version = "0.11.3", default-features = false }
zstd = "0.11"
rand = { workspace = true}
num = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions native/core/benches/row_columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::datatypes::DataType as ArrowDataType;
use comet::execution::shuffle::row::{
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
};
use comet::execution::shuffle::CompressionCodec;
use criterion::{criterion_group, criterion_main, Criterion};
use tempfile::Builder;

Expand Down Expand Up @@ -77,6 +78,7 @@ fn benchmark(c: &mut Criterion) {
false,
0,
None,
&CompressionCodec::Zstd(1),
)
.unwrap();
});
Expand Down
41 changes: 35 additions & 6 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,52 @@ fn criterion_benchmark(c: &mut Criterion) {
group.bench_function("shuffle_writer: encode (no compression))", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (snappy)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Snappy, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (lz4)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Lz4Frame, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time)
});
});
group.bench_function("shuffle_writer: end to end", |b| {
let ctx = SessionContext::new();
Expand Down
Loading

0 comments on commit 74a6a8d

Please sign in to comment.