Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

Commit

Permalink
v0.1.1: Handle symlinks and pipes, fail if no files found
Browse files Browse the repository at this point in the history
- We now process named pipes (for Pachyderm) and symlinks.
- We want to guarantee that our output file always has headers (it's a
  nice invariant), so we fail if we have no input files.
- We also add more extensive debug logging.
  • Loading branch information
emk committed May 16, 2017
1 parent 0d00c23 commit 4aac0ad
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "catcsv"
version = "0.1.0"
version = "0.1.1"
authors = ["Eric Kidd <[email protected]>"]

description = "Concatenate directories of (possibly-compressed CSV) files into one CSV file"
Expand Down
23 changes: 20 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn run() -> Result<()> {
let args: Args = Docopt::new(USAGE)
.and_then(|dopt| dopt.decode())
.unwrap_or_else(|e| e.exit());
trace!("{:?}", args);
debug!("{:?}", args);

// Report our version.
if args.flag_version {
Expand All @@ -110,21 +110,28 @@ fn run() -> Result<()> {
// Iterate over our arguments. We do this without using recursion, mostly
// to see how that looks in Rust.
let mut first_headers: Option<Vec<ByteString>> = None;
let mut files_processed: u64 = 0;
for input in &args.arg_input_file_or_dir {
for entry in WalkDir::new(input) {
for entry in WalkDir::new(input).follow_links(true) {
let entry = entry?;
if entry.file_type().is_file() {

// We want to skip directories, but process files _and_ pipes.
// Pipes are critical when working with Pachyderm, which uses
// named pipes for inputs.
if !entry.file_type().is_dir() {
debug!("Found file: {}", entry.path().display());
let filename: Cow<str> = entry.file_name().to_string_lossy();
let path = entry.path();
let mkerr = || ErrorKind::ReadFile(path.to_owned());

// Check the filename to see if we can handle this file type.
if filename.ends_with(".csv") {
debug!("Processing as *.csv");
let mut file = File::open(path).chain_err(&mkerr)?;
output_csv(&mut file, &mut first_headers, &mut out)
.chain_err(&mkerr)?;
} else if filename.ends_with(".csv.sz") {
debug!("Processing as *.csv.sz");
let file = File::open(path).chain_err(&mkerr)?;
let mut decompressed = snap::Reader::new(file);
output_csv(&mut decompressed, &mut first_headers, &mut out)
Expand All @@ -134,10 +141,19 @@ fn run() -> Result<()> {
path.display());
return Err(msg.into());
}

// Keep track of how many files we processed.
files_processed += 1;
}
}
}

// If we don't have any files, we won't produce any headers, so
// fail with an error.
if files_processed == 0 {
return Err("No input CSV files found".into());
}

Ok(())
}

Expand Down Expand Up @@ -167,6 +183,7 @@ fn output_csv(file: &mut Read,
return Err("CSV headers are different from the first file's".into());
}
} else {
debug!("Using headers: {}", first_line);
*first_headers = Some(headers);
output.write_all(first_line.as_bytes())?;
}
Expand Down

0 comments on commit 4aac0ad

Please sign in to comment.