From 4aac0ad051efc2699ba97cad0b9dc94e6c411219 Mon Sep 17 00:00:00 2001 From: Eric Kidd Date: Tue, 16 May 2017 06:49:17 -0400 Subject: [PATCH] v0.1.1: Handle symlinks and pipes, fail if no files found - We now process named pipes (for Pachyderm) and symlinks. - We want to guarantee that our output file always has headers (it's a nice invariant), so we fail if we have no input files. - We also add more extensive debug logging. --- Cargo.toml | 2 +- src/main.rs | 23 ++++++++++++++++++++--- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3e9695a..4120a03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "catcsv" -version = "0.1.0" +version = "0.1.1" authors = ["Eric Kidd "] description = "Concatenate directories of (possibly-compressed CSV) files into one CSV file" diff --git a/src/main.rs b/src/main.rs index a1d8f7b..39ee9fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -94,7 +94,7 @@ fn run() -> Result<()> { let args: Args = Docopt::new(USAGE) .and_then(|dopt| dopt.decode()) .unwrap_or_else(|e| e.exit()); - trace!("{:?}", args); + debug!("{:?}", args); // Report our version. if args.flag_version { @@ -110,10 +110,15 @@ fn run() -> Result<()> { // Iterate over our arguments. We do this without using recursion, mostly // to see how that looks in Rust. let mut first_headers: Option> = None; + let mut files_processed: u64 = 0; for input in &args.arg_input_file_or_dir { - for entry in WalkDir::new(input) { + for entry in WalkDir::new(input).follow_links(true) { let entry = entry?; - if entry.file_type().is_file() { + + // We want to skip directories, but process files _and_ pipes. + // Pipes are critical when working with Pachyderm, which uses + // named pipes for inputs. + if !entry.file_type().is_dir() { debug!("Found file: {}", entry.path().display()); let filename: Cow = entry.file_name().to_string_lossy(); let path = entry.path(); @@ -121,10 +126,12 @@ fn run() -> Result<()> { // Check the filename to see if we can handle this file type. if filename.ends_with(".csv") { + debug!("Processing as *.csv"); let mut file = File::open(path).chain_err(&mkerr)?; output_csv(&mut file, &mut first_headers, &mut out) .chain_err(&mkerr)?; } else if filename.ends_with(".csv.sz") { + debug!("Processing as *.csv.sz"); let file = File::open(path).chain_err(&mkerr)?; let mut decompressed = snap::Reader::new(file); output_csv(&mut decompressed, &mut first_headers, &mut out) @@ -134,10 +141,19 @@ fn run() -> Result<()> { path.display()); return Err(msg.into()); } + + // Keep track of how many files we processed. + files_processed += 1; } } } + // If we don't have any files, we won't produce any headers, so + // fail with an error. + if files_processed == 0 { + return Err("No input CSV files found".into()); + } + Ok(()) } @@ -167,6 +183,7 @@ fn output_csv(file: &mut Read, return Err("CSV headers are different from the first file's".into()); } } else { + debug!("Using headers: {}", first_line); *first_headers = Some(headers); output.write_all(first_line.as_bytes())?; }