diff --git a/ChangeLog.md b/ChangeLog.md index da1e70a..28212c9 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,7 @@ +## 0.0.12 +- Added a rateFile to output CSV rate statistics +- added -skipCols to skip input columns + ## 0.0.11 - Added support for quoted Keyspaces, Tables, and Columns diff --git a/README.md b/README.md index e05cf88..c4abde6 100644 --- a/README.md +++ b/README.md @@ -15,11 +15,11 @@ loading of various types of delimited files, including ### Downloading This utility has already been built, and is available at -https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.11/cassandra-loader +https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.12/cassandra-loader Get it with wget: ``` -wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.11/cassandra-loader +wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.12/cassandra-loader ``` ### Building @@ -82,12 +82,14 @@ cassandra-loader -f myFileToLoad.csv -host 1.2.3.4 -schema "test.ltest(a, b, c, `-decimalDelim` | Decimal delimiter | . | Delimiter for decimal values. Options are "." or "," `-dateFormat` | Date Format String | default for Locale.ENGLISH | Date format string as specified in the SimpleDateFormat Java class: http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html `-skipRows` | Rows to skip | 0 | Number of rows to skip at the beginning of the file + `-skipCols` | Columns to skip | | Comma-separated list of columns to skip loading (0-counted) `-maxRows` | Max rows to read | -1 | Maximum rows to read (after optional skipping of rows). -1 signifies all rows. `-maxErrors` | Max parse errors | 10 | Maximum number of rows that do not parse to allow before exiting. `-maxInsertErrors`| Max insert errors | 10 | Maximum number of rows that do not insert to allow before exiting. `-badDir` | Bad directory | current directory | Directory to write badly parsed and badly inserted rows - as well as the log file. `-rate` | Ingest rate | unlimited | Maximum rate to insert data - in rows/sec. `-progressRate` | Progress rate | 100000 | How often to report the ingest rate (number of rows) + `-rateFile` | Rate Stats File | | File to contain CSV rate statistics `-successDir` | Success directory | | Location to move successfully loaded files `-failureDir` | Failure directory | | Location to move files that failed to load diff --git a/build.gradle b/build.gradle index 7487f7f..a63e5d1 100644 --- a/build.gradle +++ b/build.gradle @@ -1,7 +1,7 @@ apply plugin: 'java' apply plugin: 'application' -def versionNum = '0.0.11' +def versionNum = '0.0.12' task loader(type: Exec) { dependsOn << 'uberloader' diff --git a/src/main/java/com/datastax/loader/CqlDelimLoad.java b/src/main/java/com/datastax/loader/CqlDelimLoad.java index cf21b72..686300f 100644 --- a/src/main/java/com/datastax/loader/CqlDelimLoad.java +++ b/src/main/java/com/datastax/loader/CqlDelimLoad.java @@ -66,7 +66,7 @@ import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; public class CqlDelimLoad { - private String version = "0.0.11"; + private String version = "0.0.12"; private String host = null; private int port = 9042; private String username = null; @@ -82,15 +82,20 @@ public class CqlDelimLoad { private double rate = 50000.0; private long progressRate = 100000; private RateLimiter rateLimiter = null; + private String rateFile = null; + private PrintStream rateStream = null; private String cqlSchema = null; private long maxErrors = 10; private long skipRows = 0; + private String skipCols = null; + private long maxRows = -1; private String badDir = "."; private String filename = null; public static String STDIN = "stdin"; + public static String STDERR = "stderr"; private String successDir = null; private String failureDir = null; @@ -110,6 +115,7 @@ private String usage() { usage.append(" -dateFormat Date format [default for Locale.ENGLISH]\n"); usage.append(" -nullString String that signifies NULL [none]\n"); usage.append(" -skipRows Number of rows to skip [0]\n"); + usage.append(" -skipCOls Comma-separated list of columsn to skip in the input file\n"); usage.append(" -maxRows Maximum number of rows to read (-1 means all) [-1]\n"); usage.append(" -maxErrors Maximum parse errors to endure [10]\n"); usage.append(" -badDir Directory for where to place badly parsed rows. [none]\n"); @@ -125,7 +131,8 @@ private String usage() { usage.append(" -numRetries Number of times to retry the INSERT [1]\n"); usage.append(" -maxInsertErrors <# errors> Maximum INSERT errors to endure [10]\n"); usage.append(" -rate Maximum insert rate [50000]\n"); - usage.append(" -progressRate How often to report the insert rate\n"); + usage.append(" -progressRate How often to report the insert rate [100000]\n"); + usage.append(" -rateFile Where to print the rate statistics\n"); usage.append(" -successDir Directory where to move successfully loaded files\n"); usage.append(" -failureDir Directory where to move files that did not successfully load\n"); @@ -269,6 +276,7 @@ private boolean parseArgs(String[] args) { if (null != (tkey = amap.remove("-numRetries"))) numRetries = Integer.parseInt(tkey); if (null != (tkey = amap.remove("-maxErrors"))) maxErrors = Long.parseLong(tkey); if (null != (tkey = amap.remove("-skipRows"))) skipRows = Integer.parseInt(tkey); + if (null != (tkey = amap.remove("-skipCols"))) skipCols = tkey; if (null != (tkey = amap.remove("-maxRows"))) maxRows = Integer.parseInt(tkey); if (null != (tkey = amap.remove("-badDir"))) badDir = tkey; if (null != (tkey = amap.remove("-dateFormat"))) dateFormatString = tkey; @@ -277,6 +285,7 @@ private boolean parseArgs(String[] args) { if (null != (tkey = amap.remove("-numThreads"))) numThreads = Integer.parseInt(tkey); if (null != (tkey = amap.remove("-rate"))) rate = Double.parseDouble(tkey); if (null != (tkey = amap.remove("-progressRate"))) progressRate = Long.parseLong(tkey); + if (null != (tkey = amap.remove("-rateFile"))) rateFile = tkey; if (null != (tkey = amap.remove("-successDir"))) successDir = tkey; if (null != (tkey = amap.remove("-failureDir"))) failureDir = tkey; if (null != (tkey = amap.remove("-decimalDelim"))) { @@ -309,7 +318,7 @@ private boolean parseArgs(String[] args) { return validateArgs(); } - private void setup() { + private void setup() throws IOException { // Connect to Cassandra Cluster.Builder clusterBuilder = Cluster.builder() .addContactPoint(host) @@ -319,12 +328,23 @@ private void setup() { if (null != username) clusterBuilder = clusterBuilder.withCredentials(username, password); cluster = clusterBuilder.build(); - rateLimiter = new RateLimiter(rate, progressRate); + + if (null != rateFile) { + if (STDERR.equalsIgnoreCase(rateFile)) { + rateStream = System.err; + } + else { + rateStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(rateFile)), true); + } + } + rateLimiter = new RateLimiter(rate, progressRate, cluster.getMetrics().getRequestsTimer(), rateStream); //rateLimiter = new Latency999RateLimiter(rate, progressRate, 3000, 200, 10, 0.5, 0.1, cluster, false); session = new RateLimitedSession(cluster.newSession(), rateLimiter); } private void cleanup() { + rateLimiter.report(null, null); + rateStream.close(); if (null != session) session.close(); if (null != cluster) @@ -382,6 +402,7 @@ public int compare(File f1, File f2) { dateFormatString, boolStyle, locale, maxErrors, skipRows, + skipCols, maxRows, badDir, infile, session, consistencyLevel, numFutures, @@ -402,6 +423,7 @@ public int compare(File f1, File f2) { dateFormatString, boolStyle, locale, maxErrors, skipRows, + skipCols, maxRows, badDir, tFile, session, consistencyLevel, @@ -416,10 +438,10 @@ public int compare(File f1, File f2) { for (Future res : results) total += res.get(); } - System.err.println("Total rows inserted: " + total); // Cleanup cleanup(); + //System.err.println("Total rows inserted: " + total); return true; } diff --git a/src/main/java/com/datastax/loader/CqlDelimLoadTask.java b/src/main/java/com/datastax/loader/CqlDelimLoadTask.java index bd23171..327e3b3 100644 --- a/src/main/java/com/datastax/loader/CqlDelimLoadTask.java +++ b/src/main/java/com/datastax/loader/CqlDelimLoadTask.java @@ -76,6 +76,7 @@ class CqlDelimLoadTask implements Callable { private CqlDelimParser cdp; private long maxErrors; private long skipRows; + private String skipCols = null; private long maxRows; private String badDir; private String successDir; @@ -105,7 +106,8 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter, String inNullString, String inDateFormatString, BooleanParser.BoolStyle inBoolStyle, Locale inLocale, - long inMaxErrors, long inSkipRows, long inMaxRows, + long inMaxErrors, long inSkipRows, + String inSkipCols, long inMaxRows, String inBadDir, File inFile, Session inSession, ConsistencyLevel inCl, int inNumFutures, int inNumRetries, @@ -120,6 +122,7 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter, locale = inLocale; maxErrors = inMaxErrors; skipRows = inSkipRows; + skipCols = inSkipCols; maxRows = inMaxRows; badDir = inBadDir; infile = inFile; @@ -159,7 +162,7 @@ private void setup() throws IOException, ParseException { cdp = new CqlDelimParser(cqlSchema, delimiter, nullString, dateFormatString, boolStyle, locale, - session); + skipCols, session); insert = cdp.generateInsert(); statement = session.prepare(insert); statement.setRetryPolicy(new LoaderRetryPolicy(numRetries)); diff --git a/src/main/java/com/datastax/loader/CqlDelimParser.java b/src/main/java/com/datastax/loader/CqlDelimParser.java index d68004c..db18402 100644 --- a/src/main/java/com/datastax/loader/CqlDelimParser.java +++ b/src/main/java/com/datastax/loader/CqlDelimParser.java @@ -33,16 +33,18 @@ import com.datastax.loader.parser.SetParser; import com.datastax.loader.parser.MapParser; +import java.lang.String; +import java.lang.IndexOutOfBoundsException; +import java.lang.NumberFormatException; +import java.text.ParseException; import java.util.Map; import java.util.List; import java.util.HashMap; import java.util.ArrayList; -import java.lang.String; -import java.text.ParseException; import java.util.Locale; import java.util.regex.Pattern; import java.util.regex.Matcher; -import java.lang.IndexOutOfBoundsException; + import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.ColumnDefinitions; @@ -56,28 +58,15 @@ public class CqlDelimParser { private String tablename; private DelimParser delimParser; - public CqlDelimParser(String inCqlSchema, Session session) - throws ParseException { - // Must supply a CQL schema - this(inCqlSchema, null, null, session); - } - - public CqlDelimParser(String inCqlSchema, String inDelimiter, - String inNullString, Session session) - throws ParseException { - // Optionally provide things for the DelimParser - delmiter, null string - this(inCqlSchema, inDelimiter, inNullString, null, null, null, session); - } - public CqlDelimParser(String inCqlSchema, String inDelimiter, String inNullString, String inDateFormatString, BooleanParser.BoolStyle inBoolStyle, Locale inLocale, - Session session) + String skipList, Session session) throws ParseException { // Optionally provide things for the line parser - date format, boolean format, locale initPmap(inDateFormatString, inBoolStyle, inLocale); processCqlSchema(inCqlSchema, session); - createDelimParser(inDelimiter, inNullString); + createDelimParser(inDelimiter, inNullString, skipList); } // used internally to store schema information @@ -198,10 +187,16 @@ else if (sb.datatype == DataType.Name.MAP) { } // Creates the DelimParser that will parse the line - private void createDelimParser(String delimiter, String nullString) { + private void createDelimParser(String delimiter, String nullString, + String skipList) throws NumberFormatException { delimParser = new DelimParser(delimiter, nullString); for (int i = 0; i < sbl.size(); i++) delimParser.add(sbl.get(i).parser); + if (null != skipList) { + for (String s : skipList.split(",")) { + delimParser.addSkip(Integer.parseInt(s.trim())); + } + } } // Convenience method to return the INSERT statement for a PreparedStatement. diff --git a/src/main/java/com/datastax/loader/CqlDelimUnload.java b/src/main/java/com/datastax/loader/CqlDelimUnload.java index e73a13e..c822491 100644 --- a/src/main/java/com/datastax/loader/CqlDelimUnload.java +++ b/src/main/java/com/datastax/loader/CqlDelimUnload.java @@ -57,7 +57,7 @@ public class CqlDelimUnload { - private String version = "0.0.11"; + private String version = "0.0.12"; private String host = null; private int port = 9042; private String username = null; @@ -395,7 +395,7 @@ private String getPartitionKey(CqlDelimParser cdp, Session tsession) { private void setup() throws IOException, ParseException { cdp = new CqlDelimParser(cqlSchema, delimiter, nullString, dateFormatString, - boolStyle, locale, session); + boolStyle, locale, null, session); String select = cdp.generateSelect(); String partitionKey = getPartitionKey(cdp, session); if (null != beginToken) { diff --git a/src/main/java/com/datastax/loader/RateLimiter.java b/src/main/java/com/datastax/loader/RateLimiter.java index d95285c..917e010 100644 --- a/src/main/java/com/datastax/loader/RateLimiter.java +++ b/src/main/java/com/datastax/loader/RateLimiter.java @@ -15,13 +15,18 @@ */ package com.datastax.loader; +import java.io.PrintStream; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import com.codahale.metrics.Timer; +import com.codahale.metrics.Snapshot; public class RateLimiter { private com.google.common.util.concurrent.RateLimiter rateLimiter; private AtomicLong numAcquires; private static long updateRate = 100000; + private Timer timer; + private PrintStream stream; private long lastVal; private long firstTime; private long lastTime; @@ -31,18 +36,63 @@ public RateLimiter(double inRate) { } public RateLimiter(double inRate, long inUpdateRate) { + this(inRate, inUpdateRate, null, null); + } + + public RateLimiter(double inRate, long inUpdateRate, + Timer inTimer, PrintStream inStream) { rateLimiter = com.google.common.util.concurrent.RateLimiter.create(inRate); updateRate = inUpdateRate; + timer = inTimer; + stream = inStream; + if ((null != stream) && (null != timer)) { + printHeader(); + } numAcquires = new AtomicLong(0); lastTime = System.currentTimeMillis(); firstTime = lastTime; lastVal = 0; } - protected synchronized void incrementAndReport(int permits) { - long currentVal = numAcquires.addAndGet(permits); - long currentTime = System.currentTimeMillis(); - if (0 == currentVal % updateRate) { + protected void printHeader() { + stream.println("Count,Min,Max,Mean,StdDev,50th,75th,95th,98th,99th,999th,MeanRate,1MinuteRate,5MinuteRate,15MinuteRate"); + } + + protected void printStats() { + Snapshot snapshot = timer.getSnapshot(); + stream.println(String.format("%d,%d,%d,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f", + timer.getCount(), + snapshot.getMin(), + snapshot.getMax(), + snapshot.getMean(), + snapshot.getStdDev(), + snapshot.getMedian(), + snapshot.get75thPercentile(), + snapshot.get95thPercentile(), + snapshot.get98thPercentile(), + snapshot.get99thPercentile(), + snapshot.get999thPercentile(), + timer.getMeanRate(), + timer.getOneMinuteRate(), + timer.getFiveMinuteRate(), + timer.getFifteenMinuteRate()) + ); + } + + public void report(Long currentVal, Long currentTime) { + if ((null != stream) && (null != timer)) { + printStats(); + } + if (null == currentVal) { + currentVal = numAcquires.get() - 1; + currentTime = System.currentTimeMillis(); + System.err.println("Lines Processed: \t" + currentVal + + " Rate: \t" + + (currentVal) + /((currentTime - firstTime) / 1000) + ); + } + else { System.err.println("Lines Processed: \t" + currentVal + " Rate: \t" + (currentVal) @@ -52,6 +102,14 @@ protected synchronized void incrementAndReport(int permits) { / ((currentTime - lastTime) / 1000) + ")" ); + } + } + + protected synchronized void incrementAndReport(int permits) { + long currentVal = numAcquires.addAndGet(permits); + long currentTime = System.currentTimeMillis(); + if (0 == currentVal % updateRate) { + report(currentVal, currentTime); lastTime = currentTime; lastVal = currentVal; } diff --git a/src/main/java/com/datastax/loader/parser/DelimParser.java b/src/main/java/com/datastax/loader/parser/DelimParser.java index 86dddc0..216591d 100644 --- a/src/main/java/com/datastax/loader/parser/DelimParser.java +++ b/src/main/java/com/datastax/loader/parser/DelimParser.java @@ -39,6 +39,7 @@ public class DelimParser { private char delim; private char quote; private char escape; + private List skip; public static String DEFAULT_DELIMITER = ","; public static String DEFAULT_NULLSTRING = ""; @@ -54,6 +55,7 @@ public DelimParser(String inDelimiter) { public DelimParser(String inDelimiter, String inNullString) { parsers = new ArrayList(); elements = new ArrayList(); + skip = new ArrayList(); parsersSize = parsers.size(); if (null == inDelimiter) delimiter = DEFAULT_DELIMITER; @@ -71,12 +73,13 @@ public DelimParser(String inDelimiter, String inNullString) { // Adds a parser to the list public void add(Parser p) { parsers.add(p); + skip.add(false); parsersSize = parsers.size(); } - // Adds a collection of parsers to the list - public void add(Collection pl) { - parsers.addAll(pl); + public void addSkip(int idx) { + parsers.add(idx, new StringParser()); + skip.add(idx, true); parsersSize = parsers.size(); } @@ -99,8 +102,11 @@ public List parseComplex(String line) { IndexedLine sr = new IndexedLine(line); for (int i = 0; i < parsersSize; i++) { try { - elements.add(parsers.get(i).parse(sr, nullString, delim, escape, - quote, (parsersSize-1 == i))); + Object toAdd = parsers.get(i).parse(sr, nullString, delim, + escape, quote, + (parsersSize-1 == i)); + if (!skip.get(i)) + elements.add(toAdd); } catch (NumberFormatException e) { System.err.println(String.format("Invalid number in input number %d: %s", i, e.getMessage()));