Skip to content

Commit

Permalink
added rateFile and skipCols
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmhess committed May 15, 2015
1 parent c4a14b8 commit 36fc257
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 40 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.0.12
- Added a rateFile to output CSV rate statistics
- added -skipCols to skip input columns

## 0.0.11
- Added support for quoted Keyspaces, Tables, and Columns

Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ loading of various types of delimited files, including

### Downloading
This utility has already been built, and is available at
https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.11/cassandra-loader
https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.12/cassandra-loader

Get it with wget:
```
wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.11/cassandra-loader
wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.12/cassandra-loader
```

### Building
Expand Down Expand Up @@ -82,12 +82,14 @@ cassandra-loader -f myFileToLoad.csv -host 1.2.3.4 -schema "test.ltest(a, b, c,
`-decimalDelim` | Decimal delimiter | . | Delimiter for decimal values. Options are "." or ","
`-dateFormat` | Date Format String | default for Locale.ENGLISH | Date format string as specified in the SimpleDateFormat Java class: http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html
`-skipRows` | Rows to skip | 0 | Number of rows to skip at the beginning of the file
`-skipCols` | Columns to skip | <not set> | Comma-separated list of columns to skip loading (0-counted)
`-maxRows` | Max rows to read | -1 | Maximum rows to read (after optional skipping of rows). -1 signifies all rows.
`-maxErrors` | Max parse errors | 10 | Maximum number of rows that do not parse to allow before exiting.
`-maxInsertErrors`| Max insert errors | 10 | Maximum number of rows that do not insert to allow before exiting.
`-badDir` | Bad directory | current directory | Directory to write badly parsed and badly inserted rows - as well as the log file.
`-rate` | Ingest rate | unlimited | Maximum rate to insert data - in rows/sec.
`-progressRate` | Progress rate | 100000 | How often to report the ingest rate (number of rows)
`-rateFile` | Rate Stats File | <not set> | File to contain CSV rate statistics
`-successDir` | Success directory | <not set> | Location to move successfully loaded files
`-failureDir` | Failure directory | <not set> | Location to move files that failed to load

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apply plugin: 'java'
apply plugin: 'application'

def versionNum = '0.0.11'
def versionNum = '0.0.12'

task loader(type: Exec) {
dependsOn << 'uberloader'
Expand Down
32 changes: 27 additions & 5 deletions src/main/java/com/datastax/loader/CqlDelimLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;

public class CqlDelimLoad {
private String version = "0.0.11";
private String version = "0.0.12";
private String host = null;
private int port = 9042;
private String username = null;
Expand All @@ -82,15 +82,20 @@ public class CqlDelimLoad {
private double rate = 50000.0;
private long progressRate = 100000;
private RateLimiter rateLimiter = null;
private String rateFile = null;
private PrintStream rateStream = null;

private String cqlSchema = null;

private long maxErrors = 10;
private long skipRows = 0;
private String skipCols = null;

private long maxRows = -1;
private String badDir = ".";
private String filename = null;
public static String STDIN = "stdin";
public static String STDERR = "stderr";
private String successDir = null;
private String failureDir = null;

Expand All @@ -110,6 +115,7 @@ private String usage() {
usage.append(" -dateFormat <dateFormatString> Date format [default for Locale.ENGLISH]\n");
usage.append(" -nullString <nullString> String that signifies NULL [none]\n");
usage.append(" -skipRows <skipRows> Number of rows to skip [0]\n");
usage.append(" -skipCOls <columnsToSkip> Comma-separated list of columsn to skip in the input file\n");
usage.append(" -maxRows <maxRows> Maximum number of rows to read (-1 means all) [-1]\n");
usage.append(" -maxErrors <maxErrors> Maximum parse errors to endure [10]\n");
usage.append(" -badDir <badDirectory> Directory for where to place badly parsed rows. [none]\n");
Expand All @@ -125,7 +131,8 @@ private String usage() {
usage.append(" -numRetries <numRetries> Number of times to retry the INSERT [1]\n");
usage.append(" -maxInsertErrors <# errors> Maximum INSERT errors to endure [10]\n");
usage.append(" -rate <rows-per-second> Maximum insert rate [50000]\n");
usage.append(" -progressRate <num txns> How often to report the insert rate\n");
usage.append(" -progressRate <num txns> How often to report the insert rate [100000]\n");
usage.append(" -rateFile <filename> Where to print the rate statistics\n");
usage.append(" -successDir <dir> Directory where to move successfully loaded files\n");
usage.append(" -failureDir <dir> Directory where to move files that did not successfully load\n");

Expand Down Expand Up @@ -269,6 +276,7 @@ private boolean parseArgs(String[] args) {
if (null != (tkey = amap.remove("-numRetries"))) numRetries = Integer.parseInt(tkey);
if (null != (tkey = amap.remove("-maxErrors"))) maxErrors = Long.parseLong(tkey);
if (null != (tkey = amap.remove("-skipRows"))) skipRows = Integer.parseInt(tkey);
if (null != (tkey = amap.remove("-skipCols"))) skipCols = tkey;
if (null != (tkey = amap.remove("-maxRows"))) maxRows = Integer.parseInt(tkey);
if (null != (tkey = amap.remove("-badDir"))) badDir = tkey;
if (null != (tkey = amap.remove("-dateFormat"))) dateFormatString = tkey;
Expand All @@ -277,6 +285,7 @@ private boolean parseArgs(String[] args) {
if (null != (tkey = amap.remove("-numThreads"))) numThreads = Integer.parseInt(tkey);
if (null != (tkey = amap.remove("-rate"))) rate = Double.parseDouble(tkey);
if (null != (tkey = amap.remove("-progressRate"))) progressRate = Long.parseLong(tkey);
if (null != (tkey = amap.remove("-rateFile"))) rateFile = tkey;
if (null != (tkey = amap.remove("-successDir"))) successDir = tkey;
if (null != (tkey = amap.remove("-failureDir"))) failureDir = tkey;
if (null != (tkey = amap.remove("-decimalDelim"))) {
Expand Down Expand Up @@ -309,7 +318,7 @@ private boolean parseArgs(String[] args) {
return validateArgs();
}

private void setup() {
private void setup() throws IOException {
// Connect to Cassandra
Cluster.Builder clusterBuilder = Cluster.builder()
.addContactPoint(host)
Expand All @@ -319,12 +328,23 @@ private void setup() {
if (null != username)
clusterBuilder = clusterBuilder.withCredentials(username, password);
cluster = clusterBuilder.build();
rateLimiter = new RateLimiter(rate, progressRate);

if (null != rateFile) {
if (STDERR.equalsIgnoreCase(rateFile)) {
rateStream = System.err;
}
else {
rateStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(rateFile)), true);
}
}
rateLimiter = new RateLimiter(rate, progressRate, cluster.getMetrics().getRequestsTimer(), rateStream);
//rateLimiter = new Latency999RateLimiter(rate, progressRate, 3000, 200, 10, 0.5, 0.1, cluster, false);
session = new RateLimitedSession(cluster.newSession(), rateLimiter);
}

private void cleanup() {
rateLimiter.report(null, null);
rateStream.close();
if (null != session)
session.close();
if (null != cluster)
Expand Down Expand Up @@ -382,6 +402,7 @@ public int compare(File f1, File f2) {
dateFormatString,
boolStyle, locale,
maxErrors, skipRows,
skipCols,
maxRows, badDir, infile,
session, consistencyLevel,
numFutures,
Expand All @@ -402,6 +423,7 @@ public int compare(File f1, File f2) {
dateFormatString,
boolStyle, locale,
maxErrors, skipRows,
skipCols,
maxRows, badDir, tFile,
session,
consistencyLevel,
Expand All @@ -416,10 +438,10 @@ public int compare(File f1, File f2) {
for (Future<Long> res : results)
total += res.get();
}
System.err.println("Total rows inserted: " + total);

// Cleanup
cleanup();
//System.err.println("Total rows inserted: " + total);

return true;
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/com/datastax/loader/CqlDelimLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class CqlDelimLoadTask implements Callable<Long> {
private CqlDelimParser cdp;
private long maxErrors;
private long skipRows;
private String skipCols = null;
private long maxRows;
private String badDir;
private String successDir;
Expand Down Expand Up @@ -105,7 +106,8 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter,
String inNullString, String inDateFormatString,
BooleanParser.BoolStyle inBoolStyle,
Locale inLocale,
long inMaxErrors, long inSkipRows, long inMaxRows,
long inMaxErrors, long inSkipRows,
String inSkipCols, long inMaxRows,
String inBadDir, File inFile,
Session inSession, ConsistencyLevel inCl,
int inNumFutures, int inNumRetries,
Expand All @@ -120,6 +122,7 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter,
locale = inLocale;
maxErrors = inMaxErrors;
skipRows = inSkipRows;
skipCols = inSkipCols;
maxRows = inMaxRows;
badDir = inBadDir;
infile = inFile;
Expand Down Expand Up @@ -159,7 +162,7 @@ private void setup() throws IOException, ParseException {

cdp = new CqlDelimParser(cqlSchema, delimiter, nullString,
dateFormatString, boolStyle, locale,
session);
skipCols, session);
insert = cdp.generateInsert();
statement = session.prepare(insert);
statement.setRetryPolicy(new LoaderRetryPolicy(numRetries));
Expand Down
33 changes: 14 additions & 19 deletions src/main/java/com/datastax/loader/CqlDelimParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@
import com.datastax.loader.parser.SetParser;
import com.datastax.loader.parser.MapParser;

import java.lang.String;
import java.lang.IndexOutOfBoundsException;
import java.lang.NumberFormatException;
import java.text.ParseException;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;
import java.lang.String;
import java.text.ParseException;
import java.util.Locale;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import java.lang.IndexOutOfBoundsException;

import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ColumnDefinitions;
Expand All @@ -56,28 +58,15 @@ public class CqlDelimParser {
private String tablename;
private DelimParser delimParser;

public CqlDelimParser(String inCqlSchema, Session session)
throws ParseException {
// Must supply a CQL schema
this(inCqlSchema, null, null, session);
}

public CqlDelimParser(String inCqlSchema, String inDelimiter,
String inNullString, Session session)
throws ParseException {
// Optionally provide things for the DelimParser - delmiter, null string
this(inCqlSchema, inDelimiter, inNullString, null, null, null, session);
}

public CqlDelimParser(String inCqlSchema, String inDelimiter,
String inNullString, String inDateFormatString,
BooleanParser.BoolStyle inBoolStyle, Locale inLocale,
Session session)
String skipList, Session session)
throws ParseException {
// Optionally provide things for the line parser - date format, boolean format, locale
initPmap(inDateFormatString, inBoolStyle, inLocale);
processCqlSchema(inCqlSchema, session);
createDelimParser(inDelimiter, inNullString);
createDelimParser(inDelimiter, inNullString, skipList);
}

// used internally to store schema information
Expand Down Expand Up @@ -198,10 +187,16 @@ else if (sb.datatype == DataType.Name.MAP) {
}

// Creates the DelimParser that will parse the line
private void createDelimParser(String delimiter, String nullString) {
private void createDelimParser(String delimiter, String nullString,
String skipList) throws NumberFormatException {
delimParser = new DelimParser(delimiter, nullString);
for (int i = 0; i < sbl.size(); i++)
delimParser.add(sbl.get(i).parser);
if (null != skipList) {
for (String s : skipList.split(",")) {
delimParser.addSkip(Integer.parseInt(s.trim()));
}
}
}

// Convenience method to return the INSERT statement for a PreparedStatement.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/datastax/loader/CqlDelimUnload.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@


public class CqlDelimUnload {
private String version = "0.0.11";
private String version = "0.0.12";
private String host = null;
private int port = 9042;
private String username = null;
Expand Down Expand Up @@ -395,7 +395,7 @@ private String getPartitionKey(CqlDelimParser cdp, Session tsession) {
private void setup() throws IOException, ParseException {
cdp = new CqlDelimParser(cqlSchema, delimiter, nullString,
dateFormatString,
boolStyle, locale, session);
boolStyle, locale, null, session);
String select = cdp.generateSelect();
String partitionKey = getPartitionKey(cdp, session);
if (null != beginToken) {
Expand Down
66 changes: 62 additions & 4 deletions src/main/java/com/datastax/loader/RateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
*/
package com.datastax.loader;

import java.io.PrintStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Snapshot;

public class RateLimiter {
private com.google.common.util.concurrent.RateLimiter rateLimiter;
private AtomicLong numAcquires;
private static long updateRate = 100000;
private Timer timer;
private PrintStream stream;
private long lastVal;
private long firstTime;
private long lastTime;
Expand All @@ -31,18 +36,63 @@ public RateLimiter(double inRate) {
}

public RateLimiter(double inRate, long inUpdateRate) {
this(inRate, inUpdateRate, null, null);
}

public RateLimiter(double inRate, long inUpdateRate,
Timer inTimer, PrintStream inStream) {
rateLimiter = com.google.common.util.concurrent.RateLimiter.create(inRate);
updateRate = inUpdateRate;
timer = inTimer;
stream = inStream;
if ((null != stream) && (null != timer)) {
printHeader();
}
numAcquires = new AtomicLong(0);
lastTime = System.currentTimeMillis();
firstTime = lastTime;
lastVal = 0;
}

protected synchronized void incrementAndReport(int permits) {
long currentVal = numAcquires.addAndGet(permits);
long currentTime = System.currentTimeMillis();
if (0 == currentVal % updateRate) {
protected void printHeader() {
stream.println("Count,Min,Max,Mean,StdDev,50th,75th,95th,98th,99th,999th,MeanRate,1MinuteRate,5MinuteRate,15MinuteRate");
}

protected void printStats() {
Snapshot snapshot = timer.getSnapshot();
stream.println(String.format("%d,%d,%d,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f",
timer.getCount(),
snapshot.getMin(),
snapshot.getMax(),
snapshot.getMean(),
snapshot.getStdDev(),
snapshot.getMedian(),
snapshot.get75thPercentile(),
snapshot.get95thPercentile(),
snapshot.get98thPercentile(),
snapshot.get99thPercentile(),
snapshot.get999thPercentile(),
timer.getMeanRate(),
timer.getOneMinuteRate(),
timer.getFiveMinuteRate(),
timer.getFifteenMinuteRate())
);
}

public void report(Long currentVal, Long currentTime) {
if ((null != stream) && (null != timer)) {
printStats();
}
if (null == currentVal) {
currentVal = numAcquires.get() - 1;
currentTime = System.currentTimeMillis();
System.err.println("Lines Processed: \t" + currentVal
+ " Rate: \t" +
(currentVal)
/((currentTime - firstTime) / 1000)
);
}
else {
System.err.println("Lines Processed: \t" + currentVal
+ " Rate: \t" +
(currentVal)
Expand All @@ -52,6 +102,14 @@ protected synchronized void incrementAndReport(int permits) {
/ ((currentTime - lastTime) / 1000)
+ ")"
);
}
}

protected synchronized void incrementAndReport(int permits) {
long currentVal = numAcquires.addAndGet(permits);
long currentTime = System.currentTimeMillis();
if (0 == currentVal % updateRate) {
report(currentVal, currentTime);
lastTime = currentTime;
lastVal = currentVal;
}
Expand Down
Loading

0 comments on commit 36fc257

Please sign in to comment.