Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

Commit

Permalink
v0.1.0: Implement directory walking, *.sz support and concatenation
Browse files Browse the repository at this point in the history
This is the first useful version, supporting what we currently need to
do at Faraday.
  • Loading branch information
emk committed May 15, 2017
1 parent 5b5feaa commit 0d00c23
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 6 deletions.
19 changes: 19 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,23 @@ name = "catcsv"
version = "0.1.0"
authors = ["Eric Kidd <[email protected]>"]

description = "Concatenate directories of (possibly-compressed CSV) files into one CSV file"
license = "MIT/Apache-2.0"
readme = "README.md"
repository = "https://github.com/faradayio/catcsv"

[dev-dependencies]
cli_test_dir = "0.1.2"

[dependencies]
# Enable this by passing `--features "clippy"` to cargo. Needs nightly Rust.
clippy = { version = "0.0.*", optional = true }
csv = "0.15.0"
docopt = "0.7.0"
env_logger = "0.4.2"
error-chain = "0.10.0"
log = "0.3.7"
# Deprecated in favor of serde, but needed for doctopt:
rustc-serialize = "0.3"
snap = "0.2.1"
walkdir = "1.0.7"
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# `catcsv`: Concatenate directories of possibly-compressed CSV files

This is a small utility that we use to reassemble many small CSV files into
much larger ones. In our case, the small CSV files are generated by
highly-parallel by [Pachyderm][] pipelines doing map/reduce-style
operations.

Usage:

```
catcsv - Combine many CSV files into one
Usage:
catcsv <input-file-or-dir>...
catcsv (--help | --version)
Options:
--help Show this screen.
--version Show version.
Input files must have the extension *.csv or *.csv.sz. The latter are assumed
to be in Google's "snappy framed" format: https://github.com/google/snappy
If passed a directory, this will recurse over all files in that directory.
```

## Wish list

If you'd like to add support for other common compression formats, such as `*.gz`,
we'll happily accept PRs that depend on either pure Rust crates, or which
include C code in the crate but still cross-compile easily with musl.

## Related utilities

If you're interested in this utility, you might also be interested in:

- BurntSushi's excellent [xsv][] utility, which features a wide variety of
subcommands for working with CSV files. Among these is a powerful `xsv
cat` command, which has many options that `catcsv` doesn't (but which
doesn't do directory walking or automatic decompression as far as I
know).
- Faraday's [scrubcsv][] utility, which attempts to normalize non-standard
CSV files.


[xsv]: https://github.com/BurntSushi/xsv
[scrubcsv]: https://github.com/faradayio/scrubcsv
[Pachyderm]: https://www.pachyderm.io
2 changes: 2 additions & 0 deletions fixtures/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
col1,col2
a,b
Binary file added fixtures/test.csv.sz
Binary file not shown.
9 changes: 9 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use_try_shorthand = true
ideal_width = 80
# This is about how big emk's standard Emacs window is, and as the biggest
# contributor, he gets to abuse his power.
max_width = 87

# Try out the new formatting conventions.
where_style = "Rfc"
generics_style = "Rfc"
6 changes: 0 additions & 6 deletions src/lib.rs

This file was deleted.

178 changes: 178 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
//! Given a list of CSV files, directories containing CSV files, compressed
//! CSV files, or some combination of the above, concatenate all the CSV files
//! together as quickly as we can.
//!
//! This replaces a 20-line shell script with two pages of Rust. But it has
//! a much nicer UI and it handles the corner cases better.
// `error_chain!` can recurse deeply
#![recursion_limit = "1024"]

// Enable clippy if we were asked to do so.
#![cfg_attr(feature="clippy", feature(plugin))]
#![cfg_attr(feature="clippy", plugin(clippy))]

extern crate csv;
extern crate docopt;
extern crate env_logger;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
extern crate rustc_serialize;
extern crate snap;
extern crate walkdir;

use csv::ByteString;
use docopt::Docopt;
use std::borrow::Cow;
use std::fs::File;
use std::io::{self, BufReader, BufWriter};
use std::io::prelude::*;
use std::process;
use walkdir::WalkDir;

use errors::*;

/// A module to hold `Error`, etc., types generated by `error-chain`.
mod errors {
use csv;
use std::io;
use std::path::PathBuf;
use walkdir;

error_chain! {
foreign_links {
Csv(csv::Error);
Io(io::Error);
WalkDir(walkdir::Error);
}

errors {
ReadFile(path: PathBuf) {
description("error reading file")
display("error reading '{}", path.display())
}
}
}
}

// Ask `error-chain` to build us a `main` function that prints errors, and
// delegate the real work to `run`.
quick_main!(run);

const USAGE: &'static str = "
catcsv - Combine many CSV files into one
Usage:
catcsv <input-file-or-dir>...
catcsv (--help | --version)
Options:
--help Show this screen.
--version Show version.
Input files must have the extension *.csv or *.csv.sz. The latter are assumed
to be in Google's \"snappy framed\" format: https://github.com/google/snappy
If passed a directory, this will recurse over all files in that directory.
";

/// Our command-line arguments.
#[derive(Debug, RustcDecodable)]
struct Args {
/// Should we show the version of the program and exit?
flag_version: bool,
/// A list of files and directories to output.
arg_input_file_or_dir: Vec<String>,
}

/// Our real `main` function that parses arguments and figures out what to do.
fn run() -> Result<()> {
env_logger::init().expect("could not initialize log subsystem");

let args: Args = Docopt::new(USAGE)
.and_then(|dopt| dopt.decode())
.unwrap_or_else(|e| e.exit());
trace!("{:?}", args);

// Report our version.
if args.flag_version {
println!("catcsv {}", env!("CARGO_PKG_VERSION"));
process::exit(0);
}

// Lock and buffer stdout for maximum performance.
let stdout = io::stdout();
let stdout_locked = stdout.lock();
let mut out = BufWriter::new(stdout_locked);

// Iterate over our arguments. We do this without using recursion, mostly
// to see how that looks in Rust.
let mut first_headers: Option<Vec<ByteString>> = None;
for input in &args.arg_input_file_or_dir {
for entry in WalkDir::new(input) {
let entry = entry?;
if entry.file_type().is_file() {
debug!("Found file: {}", entry.path().display());
let filename: Cow<str> = entry.file_name().to_string_lossy();
let path = entry.path();
let mkerr = || ErrorKind::ReadFile(path.to_owned());

// Check the filename to see if we can handle this file type.
if filename.ends_with(".csv") {
let mut file = File::open(path).chain_err(&mkerr)?;
output_csv(&mut file, &mut first_headers, &mut out)
.chain_err(&mkerr)?;
} else if filename.ends_with(".csv.sz") {
let file = File::open(path).chain_err(&mkerr)?;
let mut decompressed = snap::Reader::new(file);
output_csv(&mut decompressed, &mut first_headers, &mut out)
.chain_err(&mkerr)?;
} else {
let msg = format!("{} does not appear to be a CSV file",
path.display());
return Err(msg.into());
}
}
}
}

Ok(())
}

/// Output the specified CSV data to stand
fn output_csv(file: &mut Read,
first_headers: &mut Option<Vec<ByteString>>,
output: &mut Write)
-> Result<()> {
// Force buffered input for a big performance boost and so we can
// get the first line.
let mut input = BufReader::new(file);

// Get the first line of headers, and parse it as CSV.
//
// NOTE: This will fail if there are escaped newlines in the header line.
let mut first_line = String::new();
input.read_line(&mut first_line)?;
let mut rdr = csv::Reader::from_string(first_line.clone());

// Get our header line only.
let headers = rdr.byte_headers()?;

// If this is the first set of headers we've found, save them. If not,
// make sure that the headers match between files.
if let Some(ref first_headers) = *first_headers {
if &headers != first_headers {
return Err("CSV headers are different from the first file's".into());
}
} else {
*first_headers = Some(headers);
output.write_all(first_line.as_bytes())?;
}

// Do the fastest pass-through copy the standard library can manage.
io::copy(&mut input, output)?;

Ok(())
}
36 changes: 36 additions & 0 deletions tests/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
extern crate cli_test_dir;

use cli_test_dir::*;

#[test]
fn cat_a_csv_and_csv_sz() {
let testdir = TestDir::new("catcsv", "cat_a_csv_and_csv_sz");
let output = testdir
.cmd()
.arg(testdir.src_path("fixtures/test.csv"))
.arg(testdir.src_path("fixtures/test.csv.sz"))
.output()
.expect_success();
assert_eq!(output.stdout_str(),
"\
col1,col2
a,b
a,b
");
}

#[test]
fn cat_a_csv_and_csv_sz_in_a_dir() {
let testdir = TestDir::new("catcsv", "cat_a_csv_and_csv_sz_in_a_dir");
let output = testdir
.cmd()
.arg(testdir.src_path("fixtures"))
.output()
.expect_success();
assert_eq!(output.stdout_str(),
"\
col1,col2
a,b
a,b
");
}

0 comments on commit 0d00c23

Please sign in to comment.