Skip to content

Commit

Permalink
Merge pull request #6 from dapper91/dev
Browse files Browse the repository at this point in the history
- ExternalChunkError Display trait buf fixed.
- binary crate added.
- sorting custom comparator feature implemented.
  • Loading branch information
dapper91 authored Jan 11, 2022
2 parents 41960de + ac54edf commit 2362cad
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 37 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ext-sort"
version = "0.1.1"
version = "0.1.2"
edition = "2021"
license = "Unlicense"
description = "rust external sort algorithm implementation"
Expand All @@ -15,6 +15,7 @@ keywords = ["algorithms", "sort", "sorting", "external-sort", "external"]

[dependencies]
bytesize = { version = "^1.1", optional = true }
clap = { version = "^3.0", features = ["derive"], optional = true }
deepsize = { version = "^0.2", optional = true }
env_logger = { version = "^0.9", optional = true}
log = "^0.4"
Expand All @@ -30,6 +31,10 @@ rand = "^0.8"
[features]
memory-limit = ["deepsize"]

[[bin]]
name = "ext-sort"
required-features = ["bytesize", "clap", "env_logger"]

[[example]]
name = "quickstart"
required-features = ["bytesize", "env_logger"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Activate `memory-limit` feature of the ext-sort crate on Cargo.toml:

```toml
[dependencies]
ext-sort = { version = "^0.1.1", features = ["memory-limit"] }
ext-sort = { version = "^0.1.2", features = ["memory-limit"] }
```

``` rust
Expand Down
5 changes: 4 additions & 1 deletion src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ impl<S: Error> Error for ExternalChunkError<S> {}

impl<S: Error> Display for ExternalChunkError<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self)
match self {
ExternalChunkError::IO(err) => write!(f, "{}", err),
ExternalChunkError::SerializationError(err) => write!(f, "{}", err),
}
}
}

Expand Down
225 changes: 225 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
use std::fs;
use std::io::{self, prelude::*};
use std::path;
use std::process;

use bytesize::ByteSize;
use clap::ArgEnum;
use env_logger;
use log;

use ext_sort::buffer::mem::MemoryLimitedBufferBuilder;
use ext_sort::{ExternalSorter, ExternalSorterBuilder};

fn main() {
let arg_parser = build_arg_parser();

let log_level: LogLevel = arg_parser.value_of_t_or_exit("log_level");
init_logger(log_level);

let order: Order = arg_parser.value_of_t_or_exit("sort");
let tmp_dir: Option<&str> = arg_parser.value_of("tmp_dir");
let chunk_size = arg_parser.value_of("chunk_size").expect("value is required");
let threads: Option<usize> = arg_parser
.is_present("threads")
.then(|| arg_parser.value_of_t_or_exit("threads"));

let input = arg_parser.value_of("input").expect("value is required");
let input_stream = match fs::File::open(input) {
Ok(file) => io::BufReader::new(file),
Err(err) => {
log::error!("input file opening error: {}", err);
process::exit(1);
}
};

let output = arg_parser.value_of("output").expect("value is required");
let mut output_stream = match fs::File::create(output) {
Ok(file) => io::BufWriter::new(file),
Err(err) => {
log::error!("output file creation error: {}", err);
process::exit(1);
}
};

let mut sorter_builder = ExternalSorterBuilder::new();
if let Some(threads) = threads {
sorter_builder = sorter_builder.with_threads_number(threads);
}

if let Some(tmp_dir) = tmp_dir {
sorter_builder = sorter_builder.with_tmp_dir(path::Path::new(tmp_dir));
}

sorter_builder = sorter_builder.with_buffer(MemoryLimitedBufferBuilder::new(
chunk_size.parse::<ByteSize>().expect("value is pre-validated").as_u64(),
));

let sorter: ExternalSorter<String, io::Error, _> = match sorter_builder.build() {
Ok(sorter) => sorter,
Err(err) => {
log::error!("sorter initialization error: {}", err);
process::exit(1);
}
};

let compare = |a: &String, b: &String| {
if order == Order::Asc {
a.cmp(&b)
} else {
a.cmp(&b).reverse()
}
};

let sorted_stream = match sorter.sort_by(input_stream.lines(), compare) {
Ok(sorted_stream) => sorted_stream,
Err(err) => {
log::error!("data sorting error: {}", err);
process::exit(1);
}
};

for line in sorted_stream {
let line = match line {
Ok(line) => line,
Err(err) => {
log::error!("sorting stream error: {}", err);
process::exit(1);
}
};
if let Err(err) = output_stream.write_all(format!("{}\n", line).as_bytes()) {
log::error!("data saving error: {}", err);
process::exit(1);
};
}

if let Err(err) = output_stream.flush() {
log::error!("data flushing error: {}", err);
process::exit(1);
}
}

#[derive(Copy, Clone, clap::ArgEnum)]
enum LogLevel {
Off,
Error,
Warn,
Info,
Debug,
Trace,
}

impl LogLevel {
pub fn possible_values() -> impl Iterator<Item = clap::PossibleValue<'static>> {
Self::value_variants().iter().filter_map(|v| v.to_possible_value())
}
}

impl std::str::FromStr for LogLevel {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
<LogLevel as clap::ArgEnum>::from_str(s, false)
}
}

#[derive(Copy, Clone, PartialEq, clap::ArgEnum)]
enum Order {
Asc,
Desc,
}

impl Order {
pub fn possible_values() -> impl Iterator<Item = clap::PossibleValue<'static>> {
Order::value_variants().iter().filter_map(|v| v.to_possible_value())
}
}

impl std::str::FromStr for Order {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
<Order as clap::ArgEnum>::from_str(s, false)
}
}

fn build_arg_parser() -> clap::ArgMatches {
clap::App::new("ext-sort")
.author("Dmitry P. <[email protected]>")
.about("external sorter")
.arg(
clap::Arg::new("input")
.short('i')
.long("input")
.help("file to be sorted")
.required(true)
.takes_value(true),
)
.arg(
clap::Arg::new("output")
.short('o')
.long("output")
.help("result file")
.required(true)
.takes_value(true),
)
.arg(
clap::Arg::new("sort")
.short('s')
.long("sort")
.help("sorting order")
.takes_value(true)
.default_value("asc")
.possible_values(Order::possible_values()),
)
.arg(
clap::Arg::new("log_level")
.short('l')
.long("loglevel")
.help("logging level")
.takes_value(true)
.default_value("info")
.possible_values(LogLevel::possible_values()),
)
.arg(
clap::Arg::new("threads")
.short('t')
.long("threads")
.help("number of threads to use for parallel sorting")
.takes_value(true),
)
.arg(
clap::Arg::new("tmp_dir")
.short('d')
.long("tmp-dir")
.help("directory to be used to store temporary data")
.takes_value(true),
)
.arg(
clap::Arg::new("chunk_size")
.short('c')
.long("chunk-size")
.help("chunk size")
.required(true)
.takes_value(true)
.validator(|v| match v.parse::<ByteSize>() {
Ok(_) => Ok(()),
Err(err) => Err(format!("Chunk size format incorrect: {}", err)),
}),
)
.get_matches()
}

fn init_logger(log_level: LogLevel) {
env_logger::Builder::new()
.filter_level(match log_level {
LogLevel::Off => log::LevelFilter::Off,
LogLevel::Error => log::LevelFilter::Error,
LogLevel::Warn => log::LevelFilter::Warn,
LogLevel::Info => log::LevelFilter::Info,
LogLevel::Debug => log::LevelFilter::Debug,
LogLevel::Trace => log::LevelFilter::Trace,
})
.format_timestamp_millis()
.init();
}
Loading

0 comments on commit 2362cad

Please sign in to comment.