diff --git a/Cargo.toml b/Cargo.toml index 83bc4d1..3e9695a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,23 @@ name = "catcsv" version = "0.1.0" authors = ["Eric Kidd "] +description = "Concatenate directories of (possibly-compressed CSV) files into one CSV file" +license = "MIT/Apache-2.0" +readme = "README.md" +repository = "https://github.com/faradayio/catcsv" + +[dev-dependencies] +cli_test_dir = "0.1.2" + [dependencies] +# Enable this by passing `--features "clippy"` to cargo. Needs nightly Rust. +clippy = { version = "0.0.*", optional = true } +csv = "0.15.0" +docopt = "0.7.0" +env_logger = "0.4.2" +error-chain = "0.10.0" +log = "0.3.7" +# Deprecated in favor of serde, but needed for doctopt: +rustc-serialize = "0.3" +snap = "0.2.1" +walkdir = "1.0.7" diff --git a/README.md b/README.md new file mode 100644 index 0000000..bdb1f96 --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +# `catcsv`: Concatenate directories of possibly-compressed CSV files + +This is a small utility that we use to reassemble many small CSV files into +much larger ones. In our case, the small CSV files are generated by +highly-parallel by [Pachyderm][] pipelines doing map/reduce-style +operations. + +Usage: + +``` +catcsv - Combine many CSV files into one + +Usage: + catcsv ... + catcsv (--help | --version) + +Options: + --help Show this screen. + --version Show version. + +Input files must have the extension *.csv or *.csv.sz. The latter are assumed +to be in Google's "snappy framed" format: https://github.com/google/snappy + +If passed a directory, this will recurse over all files in that directory. +``` + +## Wish list + +If you'd like to add support for other common compression formats, such as `*.gz`, +we'll happily accept PRs that depend on either pure Rust crates, or which +include C code in the crate but still cross-compile easily with musl. + +## Related utilities + +If you're interested in this utility, you might also be interested in: + +- BurntSushi's excellent [xsv][] utility, which features a wide variety of + subcommands for working with CSV files. Among these is a powerful `xsv + cat` command, which has many options that `catcsv` doesn't (but which + doesn't do directory walking or automatic decompression as far as I + know). +- Faraday's [scrubcsv][] utility, which attempts to normalize non-standard + CSV files. + + +[xsv]: https://github.com/BurntSushi/xsv +[scrubcsv]: https://github.com/faradayio/scrubcsv +[Pachyderm]: https://www.pachyderm.io diff --git a/fixtures/test.csv b/fixtures/test.csv new file mode 100644 index 0000000..ce3b6ee --- /dev/null +++ b/fixtures/test.csv @@ -0,0 +1,2 @@ +col1,col2 +a,b diff --git a/fixtures/test.csv.sz b/fixtures/test.csv.sz new file mode 100644 index 0000000..f53526b Binary files /dev/null and b/fixtures/test.csv.sz differ diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..3389118 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,9 @@ +use_try_shorthand = true +ideal_width = 80 +# This is about how big emk's standard Emacs window is, and as the biggest +# contributor, he gets to abuse his power. +max_width = 87 + +# Try out the new formatting conventions. +where_style = "Rfc" +generics_style = "Rfc" diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index cdfbe1a..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - } -} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..a1d8f7b --- /dev/null +++ b/src/main.rs @@ -0,0 +1,178 @@ +//! Given a list of CSV files, directories containing CSV files, compressed +//! CSV files, or some combination of the above, concatenate all the CSV files +//! together as quickly as we can. +//! +//! This replaces a 20-line shell script with two pages of Rust. But it has +//! a much nicer UI and it handles the corner cases better. + +// `error_chain!` can recurse deeply +#![recursion_limit = "1024"] + +// Enable clippy if we were asked to do so. +#![cfg_attr(feature="clippy", feature(plugin))] +#![cfg_attr(feature="clippy", plugin(clippy))] + +extern crate csv; +extern crate docopt; +extern crate env_logger; +#[macro_use] +extern crate error_chain; +#[macro_use] +extern crate log; +extern crate rustc_serialize; +extern crate snap; +extern crate walkdir; + +use csv::ByteString; +use docopt::Docopt; +use std::borrow::Cow; +use std::fs::File; +use std::io::{self, BufReader, BufWriter}; +use std::io::prelude::*; +use std::process; +use walkdir::WalkDir; + +use errors::*; + +/// A module to hold `Error`, etc., types generated by `error-chain`. +mod errors { + use csv; + use std::io; + use std::path::PathBuf; + use walkdir; + + error_chain! { + foreign_links { + Csv(csv::Error); + Io(io::Error); + WalkDir(walkdir::Error); + } + + errors { + ReadFile(path: PathBuf) { + description("error reading file") + display("error reading '{}", path.display()) + } + } + } +} + +// Ask `error-chain` to build us a `main` function that prints errors, and +// delegate the real work to `run`. +quick_main!(run); + +const USAGE: &'static str = " +catcsv - Combine many CSV files into one + +Usage: + catcsv ... + catcsv (--help | --version) + +Options: + --help Show this screen. + --version Show version. + +Input files must have the extension *.csv or *.csv.sz. The latter are assumed +to be in Google's \"snappy framed\" format: https://github.com/google/snappy + +If passed a directory, this will recurse over all files in that directory. +"; + +/// Our command-line arguments. +#[derive(Debug, RustcDecodable)] +struct Args { + /// Should we show the version of the program and exit? + flag_version: bool, + /// A list of files and directories to output. + arg_input_file_or_dir: Vec, +} + +/// Our real `main` function that parses arguments and figures out what to do. +fn run() -> Result<()> { + env_logger::init().expect("could not initialize log subsystem"); + + let args: Args = Docopt::new(USAGE) + .and_then(|dopt| dopt.decode()) + .unwrap_or_else(|e| e.exit()); + trace!("{:?}", args); + + // Report our version. + if args.flag_version { + println!("catcsv {}", env!("CARGO_PKG_VERSION")); + process::exit(0); + } + + // Lock and buffer stdout for maximum performance. + let stdout = io::stdout(); + let stdout_locked = stdout.lock(); + let mut out = BufWriter::new(stdout_locked); + + // Iterate over our arguments. We do this without using recursion, mostly + // to see how that looks in Rust. + let mut first_headers: Option> = None; + for input in &args.arg_input_file_or_dir { + for entry in WalkDir::new(input) { + let entry = entry?; + if entry.file_type().is_file() { + debug!("Found file: {}", entry.path().display()); + let filename: Cow = entry.file_name().to_string_lossy(); + let path = entry.path(); + let mkerr = || ErrorKind::ReadFile(path.to_owned()); + + // Check the filename to see if we can handle this file type. + if filename.ends_with(".csv") { + let mut file = File::open(path).chain_err(&mkerr)?; + output_csv(&mut file, &mut first_headers, &mut out) + .chain_err(&mkerr)?; + } else if filename.ends_with(".csv.sz") { + let file = File::open(path).chain_err(&mkerr)?; + let mut decompressed = snap::Reader::new(file); + output_csv(&mut decompressed, &mut first_headers, &mut out) + .chain_err(&mkerr)?; + } else { + let msg = format!("{} does not appear to be a CSV file", + path.display()); + return Err(msg.into()); + } + } + } + } + + Ok(()) +} + +/// Output the specified CSV data to stand +fn output_csv(file: &mut Read, + first_headers: &mut Option>, + output: &mut Write) + -> Result<()> { + // Force buffered input for a big performance boost and so we can + // get the first line. + let mut input = BufReader::new(file); + + // Get the first line of headers, and parse it as CSV. + // + // NOTE: This will fail if there are escaped newlines in the header line. + let mut first_line = String::new(); + input.read_line(&mut first_line)?; + let mut rdr = csv::Reader::from_string(first_line.clone()); + + // Get our header line only. + let headers = rdr.byte_headers()?; + + // If this is the first set of headers we've found, save them. If not, + // make sure that the headers match between files. + if let Some(ref first_headers) = *first_headers { + if &headers != first_headers { + return Err("CSV headers are different from the first file's".into()); + } + } else { + *first_headers = Some(headers); + output.write_all(first_line.as_bytes())?; + } + + // Do the fastest pass-through copy the standard library can manage. + io::copy(&mut input, output)?; + + Ok(()) +} diff --git a/tests/tests.rs b/tests/tests.rs new file mode 100644 index 0000000..c2829d7 --- /dev/null +++ b/tests/tests.rs @@ -0,0 +1,36 @@ +extern crate cli_test_dir; + +use cli_test_dir::*; + +#[test] +fn cat_a_csv_and_csv_sz() { + let testdir = TestDir::new("catcsv", "cat_a_csv_and_csv_sz"); + let output = testdir + .cmd() + .arg(testdir.src_path("fixtures/test.csv")) + .arg(testdir.src_path("fixtures/test.csv.sz")) + .output() + .expect_success(); + assert_eq!(output.stdout_str(), + "\ +col1,col2 +a,b +a,b +"); +} + +#[test] +fn cat_a_csv_and_csv_sz_in_a_dir() { + let testdir = TestDir::new("catcsv", "cat_a_csv_and_csv_sz_in_a_dir"); + let output = testdir + .cmd() + .arg(testdir.src_path("fixtures")) + .output() + .expect_success(); + assert_eq!(output.stdout_str(), + "\ +col1,col2 +a,b +a,b +"); +}