Skip to content

Commit

Permalink
Merge commit '846befb6a620d3b8c0c7ff01be7c35c45fb72360' into update_a…
Browse files Browse the repository at this point in the history
…ugust_wk_3
  • Loading branch information
itsjunetime committed Sep 16, 2024
2 parents 0a50c3f + 846befb commit 0c1d133
Show file tree
Hide file tree
Showing 47 changed files with 826 additions and 357 deletions.
190 changes: 97 additions & 93 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ arrow = { version = "52.2.0" }
async-trait = "0.1.73"
aws-config = "0.55"
aws-credential-types = "0.55"
clap = { version = "3", features = ["derive", "cargo"] }
clap = { version = "4.5.16", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "41.0.0", features = [
"avro",
"crypto_expressions",
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::exec::{exec_and_print, exec_from_lines};
use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
use crate::print_options::PrintOptions;
use clap::ArgEnum;
use clap::ValueEnum;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
Expand Down
47 changes: 19 additions & 28 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,55 +49,55 @@ struct Args {
short = 'p',
long,
help = "Path to your data, default to current directory",
validator(is_valid_data_dir)
value_parser(parse_valid_data_dir)
)]
data_path: Option<String>,

#[clap(
short = 'b',
long,
help = "The batch size of each query, or use DataFusion default",
validator(is_valid_batch_size)
value_parser(parse_batch_size)
)]
batch_size: Option<usize>,

#[clap(
short = 'c',
long,
multiple_values = true,
num_args = 0..,
help = "Execute the given command string(s), then exit. Commands are expected to be non empty.",
validator(is_valid_command)
value_parser(parse_command)
)]
command: Vec<String>,

#[clap(
short = 'm',
long,
help = "The memory pool limitation (e.g. '10g'), default to None (no limit)",
validator(is_valid_memory_pool_size)
value_parser(extract_memory_pool_size)
)]
memory_limit: Option<String>,
memory_limit: Option<usize>,

#[clap(
short,
long,
multiple_values = true,
num_args = 0..,
help = "Execute commands from file(s), then exit",
validator(is_valid_file)
value_parser(parse_valid_file)
)]
file: Vec<String>,

#[clap(
short = 'r',
long,
multiple_values = true,
num_args = 0..,
help = "Run the provided files on startup instead of ~/.datafusionrc",
validator(is_valid_file),
value_parser(parse_valid_file),
conflicts_with = "file"
)]
rc: Option<Vec<String>>,

#[clap(long, arg_enum, default_value_t = PrintFormat::Automatic)]
#[clap(long, value_enum, default_value_t = PrintFormat::Automatic)]
format: PrintFormat,

#[clap(
Expand Down Expand Up @@ -160,8 +160,6 @@ async fn main_inner() -> Result<()> {
let rt_config =
// set memory pool size
if let Some(memory_limit) = args.memory_limit {
// unwrap is safe here because is_valid_memory_pool_size already checked the value
let memory_limit = extract_memory_pool_size(&memory_limit).unwrap();
// set memory pool type
match args.mem_pool_type {
PoolType::Fair => rt_config
Expand Down Expand Up @@ -235,39 +233,32 @@ fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
RuntimeEnv::new(rn_config)
}

fn is_valid_file(dir: &str) -> Result<(), String> {
fn parse_valid_file(dir: &str) -> Result<String, String> {
if Path::new(dir).is_file() {
Ok(())
Ok(dir.to_string())
} else {
Err(format!("Invalid file '{}'", dir))
}
}

fn is_valid_data_dir(dir: &str) -> Result<(), String> {
fn parse_valid_data_dir(dir: &str) -> Result<String, String> {
if Path::new(dir).is_dir() {
Ok(())
Ok(dir.to_string())
} else {
Err(format!("Invalid data directory '{}'", dir))
}
}

fn is_valid_batch_size(size: &str) -> Result<(), String> {
fn parse_batch_size(size: &str) -> Result<usize, String> {
match size.parse::<usize>() {
Ok(size) if size > 0 => Ok(()),
Ok(size) if size > 0 => Ok(size),
_ => Err(format!("Invalid batch size '{}'", size)),
}
}

fn is_valid_memory_pool_size(size: &str) -> Result<(), String> {
match extract_memory_pool_size(size) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}

fn is_valid_command(command: &str) -> Result<(), String> {
fn parse_command(command: &str) -> Result<String, String> {
if !command.is_empty() {
Ok(())
Ok(command.to_string())
} else {
Err("-c flag expects only non empty commands".to_string())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/pool_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
str::FromStr,
};

#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub enum PoolType {
Greedy,
Fair,
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::common::format::DEFAULT_FORMAT_OPTIONS;
use datafusion::error::Result;

/// Allow records to be printed in different formats
#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, clap::ValueEnum, Clone, Copy)]
pub enum PrintFormat {
Csv,
Tsv,
Expand All @@ -44,7 +44,7 @@ impl FromStr for PrintFormat {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
clap::ArgEnum::from_str(s, true)
clap::ValueEnum::from_str(s, true)
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_common::Result;
/// * [`CatalogProviderList`]: a collection of `CatalogProvider`s
/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems)
/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems)
/// * [`TableProvider]`: individual tables
/// * [`TableProvider`]: individual tables
///
/// # Implementing Catalogs
///
Expand Down Expand Up @@ -99,7 +99,7 @@ use datafusion_common::Result;
/// [delta-rs]: https://github.com/delta-io/delta-rs
/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123
///
/// [`TableProvider]: crate::datasource::TableProvider
/// [`TableProvider`]: crate::TableProvider
pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`]
Expand Down
65 changes: 0 additions & 65 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::Arc;
use ahash::RandomState;
use arrow::array::*;
use arrow::datatypes::*;
use arrow::row::Rows;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use arrow_buffer::IntervalDayTime;
Expand Down Expand Up @@ -363,38 +362,6 @@ pub fn create_hashes<'a>(
Ok(hashes_buffer)
}

/// Test version of `create_row_hashes` that produces the same value for
/// all hashes (to test collisions)
///
/// See comments on `hashes_buffer` for more details
#[cfg(feature = "force_hash_collisions")]
pub fn create_row_hashes<'a>(
_rows: &[Vec<u8>],
_random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
Ok(hashes_buffer)
}

/// Creates hash values for every row, based on their raw bytes.
#[cfg(not(feature = "force_hash_collisions"))]
pub fn create_row_hashes<'a>(
rows: &[Vec<u8>],
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash = random_state.hash_one(&rows[i]);
}
Ok(hashes_buffer)
}

/// Creates hash values for every row, based on the values in the
/// columns.
///
Expand Down Expand Up @@ -468,38 +435,6 @@ pub fn create_hashes<'a>(
Ok(hashes_buffer)
}

/// Test version of `create_row_hashes_v2` that produces the same value for
/// all hashes (to test collisions)
///
/// See comments on `hashes_buffer` for more details
#[cfg(feature = "force_hash_collisions")]
pub fn create_row_hashes_v2<'a>(
_rows: &Rows,
_random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
Ok(hashes_buffer)
}

/// Creates hash values for every row, based on their raw bytes.
#[cfg(not(feature = "force_hash_collisions"))]
pub fn create_row_hashes_v2<'a>(
rows: &Rows,
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash = random_state.hash_one(rows.row(i));
}
Ok(hashes_buffer)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
Loading

0 comments on commit 0c1d133

Please sign in to comment.