Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for LZ4 compression #1181

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " +
"Compression can be disabled by setting spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd"))
.createWithDefault("zstd")
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc("The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd " +
"are supported. Compression can be disabled by setting spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd", "lz4"))
.createWithDefault("lz4")

val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
.doc("The compression level to use when compression shuffle files.")
.doc("The compression level to use when compressing shuffle files with zstd.")
.intConf
.createWithDefault(1)

Expand Down

This file was deleted.

4 changes: 2 additions & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compressing shuffle files with zstd. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
Expand Down
21 changes: 1 addition & 20 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jni = "0.21"
snap = "1.1"
brotli = "3.3"
flate2 = "1.0"
lz4 = "1.24"
lz4_flex = "0.11.3"
zstd = "0.11"
rand = { workspace = true}
num = { workspace = true }
Expand Down
43 changes: 32 additions & 11 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Define JNI APIs which can be called from Java/Scala.

use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::{
Expand All @@ -40,8 +41,6 @@ use jni::{
use std::time::{Duration, Instant};
use std::{collections::HashMap, sync::Arc, task::Poll};

use super::{serde, utils::SparkArrowConvert, CometMemoryPool};

use crate::{
errors::{try_unwrap_or_throw, CometError, CometResult},
execution::{
Expand All @@ -60,6 +59,7 @@ use jni::{
use tokio::runtime::Runtime;

use crate::execution::operators::ScanExec;
use crate::execution::shuffle::read_ipc_compressed;
use crate::execution::spark_plan::SparkPlan;
use log::info;

Expand Down Expand Up @@ -95,7 +95,7 @@ struct ExecutionContext {

/// Accept serialized query plan and return the address of the native query plan.
/// # Safety
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
e: JNIEnv,
Expand Down Expand Up @@ -231,7 +231,7 @@ fn prepare_output(
array_addrs: jlongArray,
schema_addrs: jlongArray,
output_batch: RecordBatch,
exec_context: &mut ExecutionContext,
debug_native: bool,
) -> CometResult<jlong> {
let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
let num_cols = env.get_array_length(&array_address_array)? as usize;
Expand All @@ -255,7 +255,7 @@ fn prepare_output(
)));
}

if exec_context.debug_native {
if debug_native {
// Validate the output arrays.
for array in results.iter() {
let array_data = array.to_data();
Expand All @@ -275,9 +275,6 @@ fn prepare_output(
i += 1;
}

// Update metrics
update_metrics(env, exec_context)?;

Ok(num_rows as jlong)
}

Expand All @@ -298,7 +295,7 @@ fn pull_input_batches(exec_context: &mut ExecutionContext) -> Result<(), CometEr
/// Accept serialized query plan and the addresses of Arrow Arrays from Spark,
/// then execute the query. Return addresses of arrow vector.
/// # Safety
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
e: JNIEnv,
Expand Down Expand Up @@ -358,12 +355,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(

match poll_output {
Poll::Ready(Some(output)) => {
// Update metrics
update_metrics(&mut env, exec_context)?;
return prepare_output(
&mut env,
array_addrs,
schema_addrs,
output?,
exec_context,
exec_context.debug_native,
);
}
Poll::Ready(None) => {
Expand Down Expand Up @@ -459,7 +458,7 @@ fn get_execution_context<'a>(id: i64) -> &'a mut ExecutionContext {

/// Used by Comet shuffle external sorter to write sorted records to disk.
/// # Safety
/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative(
e: JNIEnv,
Expand Down Expand Up @@ -544,3 +543,25 @@ pub extern "system" fn Java_org_apache_comet_Native_sortRowPartitionsNative(
Ok(())
})
}

#[no_mangle]
/// Used by Comet native shuffle reader
/// # Safety
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
pub unsafe extern "system" fn Java_org_apache_comet_Native_decodeShuffleBlock(
e: JNIEnv,
_class: JClass,
byte_array: jbyteArray,
array_addrs: jlongArray,
schema_addrs: jlongArray,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
let value_array = unsafe { JPrimitiveArray::from_raw(byte_array) };
let length = env.get_array_length(&value_array)?;
let elements = unsafe { env.get_array_elements(&value_array, ReleaseMode::NoCopyBack)? };
let raw_pointer = elements.as_ptr();
let slice = unsafe { std::slice::from_raw_parts(raw_pointer, length as usize) };
let batch = read_ipc_compressed(slice)?;
prepare_output(&mut env, array_addrs, schema_addrs, batch, false)
})
}
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ impl PhysicalPlanner {
Ok(SparkCompressionCodec::Zstd) => {
Ok(CompressionCodec::Zstd(writer.compression_level))
}
Ok(SparkCompressionCodec::Lz4) => Ok(CompressionCodec::Lz4Frame),
_ => Err(ExecutionError::GeneralError(format!(
"Unsupported shuffle compression codec: {:?}",
writer.codec
Expand Down
4 changes: 3 additions & 1 deletion native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ mod list;
mod map;
pub mod row;
mod shuffle_writer;
pub use shuffle_writer::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec};
pub use shuffle_writer::{
read_ipc_compressed, write_ipc_compressed, CompressionCodec, ShuffleWriterExec,
};
Loading
Loading