diff --git a/ChangeLog.md b/ChangeLog.md new file mode 100644 index 0000000..2428d9e --- /dev/null +++ b/ChangeLog.md @@ -0,0 +1,6 @@ +## 0.0.9 +- Added -successDir and -failureDir +- Added return codes for the loader and unloader +- Refactored BoolStyle +- Cleaned up the readme a bit + diff --git a/README.md b/README.md index 5a774e4..1fd7d51 100644 --- a/README.md +++ b/README.md @@ -15,11 +15,11 @@ loading of various types of delimited files, including ### Downloading This utility has already been built, and is available at -https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.8/cassandra-loader +https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.9/cassandra-loader Get it with wget: ``` -wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.8/cassandra-loader +wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.9/cassandra-loader ``` ### Building @@ -73,8 +73,7 @@ cassandra-loader -f myFileToLoad.csv -host 1.2.3.4 -schema "test.ltest(a, b, c, `-pw` | Password | none | Cassandra password `-numFutures` | Number of Futures | 1000 | Number of Java driver futures in flight. `-numRetries` | Number of retries | 1 | Number of times to retry the INSERT before declaring defeat. - `-numInserErrors` | Number of errors | 10 | Number of INSERT errors to tolerate before stopping. - `-queryTimeout | Timeout in seconds | 2 | Amount of time to wait for a query to finish before timing out. + `-queryTimeout` | Timeout in seconds | 2 | Amount of time to wait for a query to finish before timing out. `-delim` | Delimiter | , | Delimiter to use `-delimInQuotes` | True/False | false | Are delimiters allowed inside quoted strings? This is more expensive to parse, so we default to false. `-nullString` | Null String | | String to represent NULL data @@ -85,8 +84,10 @@ cassandra-loader -f myFileToLoad.csv -host 1.2.3.4 -schema "test.ltest(a, b, c, `-maxRows` | Max rows to read | -1 | Maximum rows to read (after optional skipping of rows). -1 signifies all rows. `-maxErrors` | Max parse errors | 10 | Maximum number of rows that do not parse to allow before exiting. `-maxInsertErrors`| Max insert errors | 10 | Maximum number of rows that do not insert to allow before exiting. - `-badFile` | Bad File | | File to write out badly parsed rows. + `-badDir` | Bad directory | current directory | Directory to write badly parsed and badly inserted rows - as well as the log file. `-rate` | Ingest rate | unlimited | Maximum rate to insert data - in rows/sec. + `-successDir` | Success directory | | Location to move successfully loaded files + `-failureDir` | Failure directory | | Location to move files that failed to load ## Comments You can send data in on stdin by specifying the filename (via the -f switch) as "stdin" (case insensitive). @@ -107,6 +108,8 @@ place to start. Double quotes will be stripped from all fields if they both begin and end with ". +If you do not set the successDir then files that successfully loaded will remain in their input directory. The same is true for failed files if you do not set the failureDir. You cannot set either if the input file is "stdin". + boolStyle is a case-insensitive test of the True and False strings. For the different styles, the True and False strings are as follows: @@ -125,7 +128,7 @@ different styles, the True and False strings are as follows: Usage: -f -host -schema [OPTIONS] OPTIONS: -delim Delimiter to use [,] - -delmInQuotes true Set to 'true' if delimiter can be inside quoted fields [false] + -delimInQuotes true Set to 'true' if delimiter can be inside quoted fields [false] -dateFormat Date format [default for Locale.ENGLISH] -nullString String that signifies NULL [none] -skipRows Number of rows to skip [0] @@ -143,6 +146,8 @@ OPTIONS: -numRetries Number of times to retry the INSERT [1] -maxInsertErrors <# errors> Maximum INSERT errors to endure [10] -rate Maximum insert rate [50000] + -successDir Directory where to move successfully loaded files + -failureDir Directory where to move files that did not successfully load Examples: diff --git a/build.gradle b/build.gradle index 2266cdf..8dd39b6 100644 --- a/build.gradle +++ b/build.gradle @@ -1,7 +1,7 @@ apply plugin: 'java' apply plugin: 'application' -def versionNum = '0.0.8' +def versionNum = '0.0.9' task loader(type: Exec) { dependsOn << 'uberloader' diff --git a/src/main/java/com/datastax/loader/CqlDelimLoad.java b/src/main/java/com/datastax/loader/CqlDelimLoad.java index a5ede81..a53dbf2 100644 --- a/src/main/java/com/datastax/loader/CqlDelimLoad.java +++ b/src/main/java/com/datastax/loader/CqlDelimLoad.java @@ -21,6 +21,7 @@ import java.lang.System; import java.lang.String; +import java.lang.StringBuilder; import java.lang.Integer; import java.lang.Runtime; import java.util.List; @@ -49,6 +50,10 @@ import java.util.concurrent.Future; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; @@ -62,7 +67,7 @@ import com.google.common.util.concurrent.RateLimiter; public class CqlDelimLoad { - private String version = "0.0.8"; + private String version = "0.0.9"; private String host = null; private int port = 9042; private String username = null; @@ -84,6 +89,9 @@ public class CqlDelimLoad { private long maxRows = -1; private String badDir = "."; private String filename = null; + public static String STDIN = "stdin"; + private String successDir = null; + private String failureDir = null; private Locale locale = null; private BooleanParser.BoolStyle boolStyle = null; @@ -95,33 +103,36 @@ public class CqlDelimLoad { private int numThreads = Runtime.getRuntime().availableProcessors(); private String usage() { - String usage = "version: " + version + "\n"; - usage = usage + "Usage: -f -host -schema [OPTIONS]\n"; - usage = usage + "OPTIONS:\n"; - usage = usage + " -delim Delimiter to use [,]\n"; - usage = usage + " -delimInQuotes true Set to 'true' if delimiter can be inside quoted fields [false]\n"; - usage = usage + " -dateFormat Date format [default for Locale.ENGLISH]\n"; - usage = usage + " -nullString String that signifies NULL [none]\n"; - usage = usage + " -skipRows Number of rows to skip [0]\n"; - usage = usage + " -maxRows Maximum number of rows to read (-1 means all) [-1]\n"; - usage = usage + " -maxErrors Maximum parse errors to endure [10]\n"; - usage = usage + " -badDir Directory for where to place badly parsed rows. [none]\n"; - usage = usage + " -port CQL Port Number [9042]\n"; - usage = usage + " -user Cassandra username [none]\n"; - usage = usage + " -pw Password for user [none]\n"; - usage = usage + " -numFutures Number of CQL futures to keep in flight [1000]\n"; - usage = usage + " -decimalDelim Decimal delimiter [.] Other option is ','\n"; - usage = usage + " -boolStyle Style for booleans [TRUE_FALSE]\n"; - usage = usage + " -numThreads Number of concurrent threads (files) to load [num cores]\n"; - usage = usage + " -queryTimeout <# seconds> Query timeout (in seconds) [2]\n"; - usage = usage + " -numRetries Number of times to retry the INSERT [1]\n"; - usage = usage + " -maxInsertErrors <# errors> Maximum INSERT errors to endure [10]\n"; - usage = usage + " -rate Maximum insert rate [50000]\n"; - usage = usage + "\n\nExamples:\n"; - usage = usage + "cassandra-loader -f /path/to/file.csv -host localhost -schema \"test.test3(a, b, c)\"\n"; - usage = usage + "cassandra-loader -f /path/to/directory -host 1.2.3.4 -schema \"test.test3(a, b, c)\" -delim \"\\t\" -numThreads 10\n"; - usage = usage + "cassandra-loader -f stdin -host localhost -schema \"test.test3(a, b, c)\" -user myuser -pw mypassword\n"; - return usage; + StringBuilder usage = new StringBuilder("version: ").append(version).append("\n"); + usage.append("Usage: -f -host -schema [OPTIONS]\n"); + usage.append("OPTIONS:\n"); + usage.append(" -delim Delimiter to use [,]\n"); + usage.append(" -delimInQuotes true Set to 'true' if delimiter can be inside quoted fields [false]\n"); + usage.append(" -dateFormat Date format [default for Locale.ENGLISH]\n"); + usage.append(" -nullString String that signifies NULL [none]\n"); + usage.append(" -skipRows Number of rows to skip [0]\n"); + usage.append(" -maxRows Maximum number of rows to read (-1 means all) [-1]\n"); + usage.append(" -maxErrors Maximum parse errors to endure [10]\n"); + usage.append(" -badDir Directory for where to place badly parsed rows. [none]\n"); + usage.append(" -port CQL Port Number [9042]\n"); + usage.append(" -user Cassandra username [none]\n"); + usage.append(" -pw Password for user [none]\n"); + usage.append(" -numFutures Number of CQL futures to keep in flight [1000]\n"); + usage.append(" -decimalDelim Decimal delimiter [.] Other option is ','\n"); + usage.append(" -boolStyle Style for booleans [TRUE_FALSE]\n"); + usage.append(" -numThreads Number of concurrent threads (files) to load [num cores]\n"); + usage.append(" -queryTimeout <# seconds> Query timeout (in seconds) [2]\n"); + usage.append(" -numRetries Number of times to retry the INSERT [1]\n"); + usage.append(" -maxInsertErrors <# errors> Maximum INSERT errors to endure [10]\n"); + usage.append(" -rate Maximum insert rate [50000]\n"); + usage.append(" -successDir Directory where to move successfully loaded files\n"); + usage.append(" -failureDir Directory where to move files that did not successfully load\n"); + + usage.append("\n\nExamples:\n"); + usage.append("cassandra-loader -f /path/to/file.csv -host localhost -schema \"test.test3(a, b, c)\"\n"); + usage.append("cassandra-loader -f /path/to/directory -host 1.2.3.4 -schema \"test.test3(a, b, c)\" -delim \"\\t\" -numThreads 10\n"); + usage.append("cassandra-loader -f stdin -host localhost -schema \"test.test3(a, b, c)\" -user myuser -pw mypassword\n"); + return usage.toString(); } private boolean validateArgs() { @@ -157,7 +168,7 @@ private boolean validateArgs() { System.err.println("Number of threads must be non-negative"); return false; } - if (!filename.equalsIgnoreCase("stdin")) { + if (!STDIN.equalsIgnoreCase(filename)) { File infile = new File(filename); if ((!infile.isFile()) && (!infile.isDirectory())) { System.err.println("The -f argument needs to be a file or a directory"); @@ -171,6 +182,28 @@ private boolean validateArgs() { } } } + if (null != successDir) { + if (STDIN.equalsIgnoreCase(filename)) { + System.err.println("Cannot specify -successDir with stdin"); + return false; + } + File sdir = new File(successDir); + if (!sdir.isDirectory()) { + System.err.println("-successDir must be a directory"); + return false; + } + } + if (null != failureDir) { + if (STDIN.equalsIgnoreCase(filename)) { + System.err.println("Cannot specify -failureDir with stdin"); + return false; + } + File sdir = new File(failureDir); + if (!sdir.isDirectory()) { + System.err.println("-failureDir must be a directory"); + return false; + } + } if ((null == username) && (null != password)) { System.err.println("If you supply the password, you must supply the username"); return false; @@ -238,6 +271,8 @@ private boolean parseArgs(String[] args) { if (null != (tkey = amap.remove("-delimInQuotes"))) delimiterInQuotes = Boolean.parseBoolean(tkey); if (null != (tkey = amap.remove("-numThreads"))) numThreads = Integer.parseInt(tkey); if (null != (tkey = amap.remove("-rate"))) rate = Double.parseDouble(tkey); + if (null != (tkey = amap.remove("-successDir"))) successDir = tkey; + if (null != (tkey = amap.remove("-failureDir"))) failureDir = tkey; if (null != (tkey = amap.remove("-decimalDelim"))) { if (tkey.equals(",")) locale = Locale.FRANCE; @@ -304,20 +339,21 @@ public boolean run(String[] args) throws IOException, ParseException, Interrupte BufferedReader reader = null; String readerName = ""; Deque fileList = new ArrayDeque(); - if (filename.equalsIgnoreCase("stdin")) { - reader = new BufferedReader(new InputStreamReader(System.in)); - readerName = "stdin"; + File infile = null; + File[] inFileList = null; + boolean onefile = true; + if (STDIN.equalsIgnoreCase(filename)) { + infile = null; } else { - File infile = new File(filename); + infile = new File(filename); if (infile.isFile()) { - reader = new BufferedReader(new FileReader(infile)); - readerName = infile.getName(); } else { - File[] inFileList = infile.listFiles(); + inFileList = infile.listFiles(); if (inFileList.length < 1) throw new IOException("directory is empty"); + onefile = false; Arrays.sort(inFileList, new Comparator() { public int compare(File f1, File f2) { @@ -332,7 +368,7 @@ public int compare(File f1, File f2) { // Launch Threads ExecutorService executor; long total = 0; - if (null != reader) { + if (onefile) { // One file/stdin to process executor = Executors.newSingleThreadExecutor(); Callable worker = new ThreadExecute(cqlSchema, delimiter, @@ -341,11 +377,12 @@ public int compare(File f1, File f2) { dateFormatString, boolStyle, locale, maxErrors, skipRows, - maxRows, badDir, reader, - readerName, + maxRows, badDir, infile, session, numFutures, numRetries, queryTimeout, - maxInsertErrors, rateLimiter); + maxInsertErrors, + rateLimiter, + successDir, failureDir); Future res = executor.submit(worker); total = res.get(); executor.shutdown(); @@ -355,20 +392,20 @@ public int compare(File f1, File f2) { Set> results = new HashSet>(); while (!fileList.isEmpty()) { File tFile = fileList.pop(); - BufferedReader r = new BufferedReader(new FileReader(tFile)); - String rName = tFile.getName(); Callable worker = new ThreadExecute(cqlSchema, delimiter, nullString, delimiterInQuotes, dateFormatString, boolStyle, locale, maxErrors, skipRows, - maxRows, badDir, r, - rName, session, + maxRows, badDir, tFile, + session, numFutures, numRetries, queryTimeout, - maxInsertErrors, rateLimiter); + maxInsertErrors, + rateLimiter, + successDir, failureDir); results.add(executor.submit(worker)); } executor.shutdown(); @@ -385,7 +422,12 @@ public int compare(File f1, File f2) { public static void main(String[] args) throws IOException, ParseException, InterruptedException, ExecutionException { CqlDelimLoad cdl = new CqlDelimLoad(); - cdl.run(args); + boolean success = cdl.run(args); + if (success) { + System.exit(0); + } else { + System.exit(-1); + } } class ThreadExecute implements Callable { @@ -400,11 +442,14 @@ class ThreadExecute implements Callable { private long skipRows; private long maxRows; private String badDir; + private String successDir; + private String failureDir; + private String readerName; private PrintStream badParsePrinter = null; private PrintStream badInsertPrinter = null; private PrintStream logPrinter = null; private BufferedReader reader; - private String readerName; + private File infile; private int numFutures; private long numInserted; private final RateLimiter rateLimiter; @@ -427,11 +472,12 @@ public ThreadExecute(String inCqlSchema, String inDelimiter, BooleanParser.BoolStyle inBoolStyle, Locale inLocale, long inMaxErrors, long inSkipRows, long inMaxRows, - String inBadDir, BufferedReader inReader, - String inReaderName, Session inSession, + String inBadDir, File inFile, + Session inSession, int inNumFutures, int inNumRetries, int inQueryTimeout, long inMaxInsertErrors, - RateLimiter inRateLimiter) { + RateLimiter inRateLimiter, + String inSuccessDir, String inFailureDir) { super(); cqlSchema = inCqlSchema; delimiter = inDelimiter; @@ -444,35 +490,39 @@ public ThreadExecute(String inCqlSchema, String inDelimiter, skipRows = inSkipRows; maxRows = inMaxRows; badDir = inBadDir; - reader = inReader; - readerName = inReaderName; + infile = inFile; session = inSession; numFutures = inNumFutures; numRetries = inNumRetries; queryTimeout = inQueryTimeout; maxInsertErrors = inMaxInsertErrors; rateLimiter = inRateLimiter; + successDir = inSuccessDir; + failureDir = inFailureDir; } public Long call() throws IOException, ParseException { setup(); numInserted = execute(); - cleanup(); return numInserted; } private void setup() throws IOException, ParseException { + if (null == infile) { + reader = new BufferedReader(new InputStreamReader(System.in)); + //readerName = CqlDelimLoad.STDIN; + readerName = "stdin"; + } + else { + reader = new BufferedReader(new FileReader(infile)); + readerName = infile.getName(); + } + // Prepare Badfile if (null != badDir) { - badParsePrinter = new PrintStream(new BufferedOutputStream(new FileOutputStream(badDir + "/" - + readerName - + BADPARSE))); - badInsertPrinter = new PrintStream(new BufferedOutputStream(new FileOutputStream(badDir + "/" - + readerName - + BADINSERT))); - logPrinter = new PrintStream(new BufferedOutputStream(new FileOutputStream(badDir + "/" - + readerName - + LOG))); + badParsePrinter = new PrintStream(new BufferedOutputStream(new FileOutputStream(badDir + "/" + readerName + BADPARSE))); + badInsertPrinter = new PrintStream(new BufferedOutputStream(new FileOutputStream(badDir + "/" + readerName + BADINSERT))); + logPrinter = new PrintStream(new BufferedOutputStream(new FileOutputStream(badDir + "/" + readerName + LOG))); } cdp = new CqlDelimParser(cqlSchema, delimiter, nullString, @@ -483,13 +533,29 @@ private void setup() throws IOException, ParseException { statement.setRetryPolicy(new LoaderRetryPolicy(numRetries)); } - private void cleanup() throws IOException { + private void cleanup(boolean success) throws IOException { if (null != badParsePrinter) badParsePrinter.close(); if (null != badInsertPrinter) badInsertPrinter.close(); if (null != logPrinter) logPrinter.close(); + if (success) { + if (null != successDir) { + Path src = infile.toPath(); + Path dst = Paths.get(successDir); + Files.move(src, dst.resolve(src.getFileName()), + StandardCopyOption.REPLACE_EXISTING); + } + } + else { + if (null != failureDir) { + Path src = infile.toPath(); + Path dst = Paths.get(failureDir); + Files.move(src, dst.resolve(src.getFileName()), + StandardCopyOption.REPLACE_EXISTING); + } + } } private long execute() throws IOException { @@ -520,7 +586,7 @@ private long execute() throws IOException { rateLimiter.acquire(1); ResultSetFuture resultSetFuture = session.executeAsync(bind); if (!fm.add(resultSetFuture, line)) { - cleanup(); + cleanup(false); return -2; } numInserted++; @@ -539,13 +605,13 @@ private long execute() throws IOException { logPrinter.println(String.format("Maximum number of errors exceeded (%d) for %s", numErrors, readerName)); } System.err.println(String.format("Maximum number of errors exceeded (%d) for %s", numErrors, readerName)); - cleanup(); + cleanup(false); return -1; } } } if (!fm.cleanup()) { - cleanup(); + cleanup(false); return -1; } @@ -554,7 +620,7 @@ private long execute() throws IOException { } System.err.println("*** DONE: " + readerName + " number of lines processed: " + lineNumber + " (" + numInserted + " inserted)"); - cleanup(); + cleanup(true); return fm.getNumInserted(); } } diff --git a/src/main/java/com/datastax/loader/CqlDelimUnload.java b/src/main/java/com/datastax/loader/CqlDelimUnload.java index 6880c5f..07d3ce1 100644 --- a/src/main/java/com/datastax/loader/CqlDelimUnload.java +++ b/src/main/java/com/datastax/loader/CqlDelimUnload.java @@ -57,7 +57,7 @@ public class CqlDelimUnload { - private String version = "0.0.8"; + private String version = "0.0.9"; private String host = null; private int port = 9042; private String username = null; @@ -308,7 +308,12 @@ public boolean run(String[] args) throws IOException, ParseException, public static void main(String[] args) throws IOException, ParseException, InterruptedException, ExecutionException { CqlDelimUnload cdu = new CqlDelimUnload(); - cdu.run(args); + boolean success = cdu.run(args); + if (success) { + System.exit(0); + } else { + System.exit(-1); + } } class ThreadExecute implements Callable { diff --git a/src/main/java/com/datastax/loader/parser/BooleanParser.java b/src/main/java/com/datastax/loader/parser/BooleanParser.java index df77a9c..3367592 100644 --- a/src/main/java/com/datastax/loader/parser/BooleanParser.java +++ b/src/main/java/com/datastax/loader/parser/BooleanParser.java @@ -24,11 +24,35 @@ // Boolean parser - handles any way that Booleans can be expressed in Java public class BooleanParser implements Parser { - public static enum BoolStyle { BoolStyle_TrueFalse, - BoolStyle_10, - BoolStyle_TF, - BoolStyle_YN, - BoolStyle_YesNo } + public static enum BoolStyle { + BoolStyle_TrueFalse("TRUE_FALSE", "TRUE", "FALSE"), + BoolStyle_10("1_0", "1", "0"), + BoolStyle_TF("T_F", "T", "F"), + BoolStyle_YN("Y_N", "Y", "N"), + BoolStyle_YesNo("YES_NO", "YES", "NO"); + + private String styleStr; + private String trueStr; + private String falseStr; + + BoolStyle(String inStyleStr, String inTrueStr, String inFalseStr) { + styleStr = inStyleStr; + trueStr = inTrueStr; + falseStr = inFalseStr; + } + + public String getStyle() { + return styleStr; + } + + public String getTrueStr() { + return trueStr; + } + + public String getFalseStr() { + return falseStr; + } + } private String boolTrue; private String boolFalse; @@ -43,32 +67,10 @@ public BooleanParser() { } public BooleanParser(BoolStyle inBoolStyle) { - boolTrue = "TRUE"; - boolFalse = "FALSE"; if (null == inBoolStyle) inBoolStyle = BoolStyle.BoolStyle_TrueFalse; - switch (inBoolStyle) { - case BoolStyle_10: - boolTrue = "1"; - boolFalse = "0"; - break; - case BoolStyle_TF: - boolTrue = "T"; - boolFalse = "F"; - break; - case BoolStyle_YN: - boolTrue = "Y"; - boolFalse = "N"; - break; - case BoolStyle_YesNo: - boolTrue = "YES"; - boolFalse = "NO"; - break; - default: - boolTrue = "TRUE"; - boolFalse = "FALSE"; - break; - } + boolTrue = inBoolStyle.getTrueStr(); + boolFalse = inBoolStyle.getFalseStr(); } public BooleanParser(String inBoolTrue, String inBoolFalse) { @@ -77,16 +79,11 @@ public BooleanParser(String inBoolTrue, String inBoolFalse) { } public static BoolStyle getBoolStyle(String instr) { - if (BOOLSTYLE_1_0.equalsIgnoreCase(instr)) - return BoolStyle.BoolStyle_10; - if (BOOLSTYLE_T_F.equalsIgnoreCase(instr)) - return BoolStyle.BoolStyle_TF; - if (BOOLSTYLE_Y_N.equalsIgnoreCase(instr)) - return BoolStyle.BoolStyle_YN; - if (BOOLSTYLE_YES_NO.equalsIgnoreCase(instr)) - return BoolStyle.BoolStyle_YesNo; - if (BOOLSTYLE_TRUE_FALSE.equalsIgnoreCase(instr)) - return BoolStyle.BoolStyle_TrueFalse; + for (BoolStyle bs : BoolStyle.values()) { + if (bs.getStyle().equalsIgnoreCase(instr)) { + return bs; + } + } return null; }