Skip to content

Commit

Permalink
Add support for LZ4 compression
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 21, 2024
1 parent ea6d205 commit 291d8f1
Show file tree
Hide file tree
Showing 30 changed files with 326 additions and 236 deletions.
17 changes: 8 additions & 9 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " +
"Compression can be disabled by setting spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd"))
.createWithDefault("zstd")
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc("The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd " +
"are supported. Compression can be disabled by setting spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd", "lz4"))
.createWithDefault("lz4")

val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
.doc("The compression level to use when compression shuffle files.")
.doc("The compression level to use when compressing shuffle files with zstd.")
.intConf
.createWithDefault(1)

Expand Down

This file was deleted.

4 changes: 2 additions & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compressing shuffle files with zstd. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
Expand Down
21 changes: 1 addition & 20 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jni = "0.21"
snap = "1.1"
brotli = "3.3"
flate2 = "1.0"
lz4 = "1.24"
lz4_flex = "0.11.3"
zstd = "0.11"
rand = { workspace = true}
num = { workspace = true }
Expand Down
43 changes: 32 additions & 11 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Define JNI APIs which can be called from Java/Scala.
use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::{
Expand All @@ -40,8 +41,6 @@ use jni::{
use std::time::{Duration, Instant};
use std::{collections::HashMap, sync::Arc, task::Poll};

use super::{serde, utils::SparkArrowConvert, CometMemoryPool};

use crate::{
errors::{try_unwrap_or_throw, CometError, CometResult},
execution::{
Expand All @@ -60,6 +59,7 @@ use jni::{
use tokio::runtime::Runtime;

use crate::execution::operators::ScanExec;
use crate::execution::shuffle::read_ipc_compressed;
use crate::execution::spark_plan::SparkPlan;
use log::info;

Expand Down Expand Up @@ -95,7 +95,7 @@ struct ExecutionContext {

/// Accept serialized query plan and return the address of the native query plan.
/// # Safety
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
e: JNIEnv,
Expand Down Expand Up @@ -231,7 +231,7 @@ fn prepare_output(
array_addrs: jlongArray,
schema_addrs: jlongArray,
output_batch: RecordBatch,
exec_context: &mut ExecutionContext,
debug_native: bool,
) -> CometResult<jlong> {
let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
let num_cols = env.get_array_length(&array_address_array)? as usize;
Expand All @@ -255,7 +255,7 @@ fn prepare_output(
)));
}

if exec_context.debug_native {
if debug_native {
// Validate the output arrays.
for array in results.iter() {
let array_data = array.to_data();
Expand All @@ -275,9 +275,6 @@ fn prepare_output(
i += 1;
}

// Update metrics
update_metrics(env, exec_context)?;

Ok(num_rows as jlong)
}

Expand All @@ -298,7 +295,7 @@ fn pull_input_batches(exec_context: &mut ExecutionContext) -> Result<(), CometEr
/// Accept serialized query plan and the addresses of Arrow Arrays from Spark,
/// then execute the query. Return addresses of arrow vector.
/// # Safety
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
e: JNIEnv,
Expand Down Expand Up @@ -358,12 +355,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(

match poll_output {
Poll::Ready(Some(output)) => {
// Update metrics
update_metrics(&mut env, exec_context)?;
return prepare_output(
&mut env,
array_addrs,
schema_addrs,
output?,
exec_context,
exec_context.debug_native,
);
}
Poll::Ready(None) => {
Expand Down Expand Up @@ -459,7 +458,7 @@ fn get_execution_context<'a>(id: i64) -> &'a mut ExecutionContext {

/// Used by Comet shuffle external sorter to write sorted records to disk.
/// # Safety
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative(
e: JNIEnv,
Expand Down Expand Up @@ -544,3 +543,25 @@ pub extern "system" fn Java_org_apache_comet_Native_sortRowPartitionsNative(
Ok(())
})
}

#[no_mangle]
/// Used by Comet native shuffle reader
/// # Safety
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
pub unsafe extern "system" fn Java_org_apache_comet_Native_decodeShuffleBlock(
e: JNIEnv,
_class: JClass,
byte_array: jbyteArray,
array_addrs: jlongArray,
schema_addrs: jlongArray,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
let value_array = unsafe { JPrimitiveArray::from_raw(byte_array) };
let length = env.get_array_length(&value_array)?;
let elements = unsafe { env.get_array_elements(&value_array, ReleaseMode::NoCopyBack)? };
let raw_pointer = elements.as_ptr();
let slice = unsafe { std::slice::from_raw_parts(raw_pointer, length as usize) };
let batch = read_ipc_compressed(slice)?;
prepare_output(&mut env, array_addrs, schema_addrs, batch, false)
})
}
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ impl PhysicalPlanner {
Ok(SparkCompressionCodec::Zstd) => {
Ok(CompressionCodec::Zstd(writer.compression_level))
}
Ok(SparkCompressionCodec::Lz4) => Ok(CompressionCodec::Lz4Frame),
_ => Err(ExecutionError::GeneralError(format!(
"Unsupported shuffle compression codec: {:?}",
writer.codec
Expand Down
4 changes: 3 additions & 1 deletion native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ mod list;
mod map;
pub mod row;
mod shuffle_writer;
pub use shuffle_writer::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec};
pub use shuffle_writer::{
read_ipc_compressed, write_ipc_compressed, CompressionCodec, ShuffleWriterExec,
};
Loading

0 comments on commit 291d8f1

Please sign in to comment.