Skip to content

Commit

Permalink
updated unloader
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmhess committed Aug 3, 2015
1 parent 8f8fa7a commit 989726e
Show file tree
Hide file tree
Showing 21 changed files with 272 additions and 99 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ apply plugin: 'application'

def versionNum = '0.0.13'

allprojects {
tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:unchecked"
}
}

task loader(type: Exec) {
dependsOn << 'uberloader'
commandLine './src/make/buildit.sh'
Expand Down
203 changes: 171 additions & 32 deletions src/main/java/com/datastax/loader/CqlDelimUnload.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.lang.System;
import java.lang.String;
import java.lang.StringBuilder;
import java.lang.Integer;
import java.util.List;
import java.util.ArrayList;
Expand All @@ -37,21 +38,38 @@
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.text.ParseException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.KeyStoreException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;

Expand All @@ -62,6 +80,11 @@ public class CqlDelimUnload {
private int port = 9042;
private String username = null;
private String password = null;
private String truststorePath = null;
private String truststorePwd = null;
private String keystorePath = null;
private String keystorePwd = null;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.LOCAL_ONE;
private Cluster cluster = null;
private Session session = null;
private String beginToken = "-9223372036854775808";
Expand All @@ -79,21 +102,27 @@ public class CqlDelimUnload {
private int numThreads = 5;

private String usage() {
String usage = "version: " + version + "\n";
usage = usage + "Usage: -f <outputStem> -host <ipaddress> -schema <schema> [OPTIONS]\n";
usage = usage + "OPTIONS:\n";
usage = usage + " -delim <delimiter> Delimiter to use [,]\n";
usage = usage + " -dateFormat <dateFormatString> Date format [default for Locale.ENGLISH]\n";
usage = usage + " -nullString <nullString> String that signifies NULL [none]\n";
usage = usage + " -port <portNumber> CQL Port Number [9042]\n";
usage = usage + " -user <username> Cassandra username [none]\n";
usage = usage + " -pw <password> Password for user [none]\n";
usage = usage + " -decimalDelim <decimalDelim> Decimal delimiter [.] Other option is ','\n";
usage = usage + " -boolStyle <boolStyleString> Style for booleans [TRUE_FALSE]\n";
usage = usage + " -numThreads <numThreads> Number of concurrent threads (files) to load [5]\n";
usage = usage + " -beginToken <tokenString> Begin token [none]\n";
usage = usage + " -endToken <tokenString> End token [none]\n";
return usage;
StringBuilder usage = new StringBuilder("version: ").append(version).append("\n");
usage.append("Usage: -f <outputStem> -host <ipaddress> -schema <schema> [OPTIONS]\n");
usage.append("OPTIONS:\n");
usage.append(" -configFile <filename> File with configuration options\n");
usage.append(" -delim <delimiter> Delimiter to use [,]\n");
usage.append(" -dateFormat <dateFormatString> Date format [default for Locale.ENGLISH]\n");
usage.append(" -nullString <nullString> String that signifies NULL [none]\n");
usage.append(" -port <portNumber> CQL Port Number [9042]\n");
usage.append(" -user <username> Cassandra username [none]\n");
usage.append(" -pw <password> Password for user [none]\n");
usage.append(" -ssl-truststore-path <path> Path to SSL truststore [none]\n");
usage.append(" -ssl-truststore-pw <pwd> Password for SSL truststore [none]\n");
usage.append(" -ssl-keystore-path <path> Path to SSL keystore [none]\n");
usage.append(" -ssl-keystore-pw <pwd> Password for SSL keystore [none]\n");
usage.append(" -consistencyLevel <CL> Consistency level [LOCAL_ONE]");
usage.append(" -decimalDelim <decimalDelim> Decimal delimiter [.] Other option is ','\n");
usage.append(" -boolStyle <boolStyleString> Style for booleans [TRUE_FALSE]\n");
usage.append(" -numThreads <numThreads> Number of concurrent threads (files) to load [5]\n");
usage.append(" -beginToken <tokenString> Begin token [none]\n");
usage.append(" -endToken <tokenString> End token [none]\n");
return usage.toString();
}

private boolean validateArgs() {
Expand All @@ -112,21 +141,75 @@ private boolean validateArgs() {
if (filename.equalsIgnoreCase("stdout")) {
numThreads = 1;
}
/*
if (null == beginToken) {
System.err.println("You must specify the begin token");
if ((null == truststorePath) && (null != truststorePwd)) {
System.err.println("If you supply the ssl-truststore-pwd, you must supply the ssl-truststore-path");
return false;
}
if ((null != truststorePath) && (null == truststorePwd)) {
System.err.println("If you supply the ssl-truststore-path, you must supply the ssl-truststore-pwd");
return false;
}
if ((null == keystorePath) && (null != keystorePwd)) {
System.err.println("If you supply the ssl-keystore-pwd, you must supply the ssl-keystore-path");
return false;
}
if ((null != keystorePath) && (null == keystorePwd)) {
System.err.println("If you supply the ssl-keystore-path, you must supply the ssl-keystore-pwd");
return false;
}
File tfile = null;
if (null != truststorePath) {
tfile = new File(truststorePath);
if (!tfile.isFile()) {
System.err.println("truststore file must be a file");
return false;
}
}
if (null != keystorePath) {
tfile = new File(keystorePath);
if (!tfile.isFile()) {
System.err.println("keystore file must be a file");
return false;
}
}
if ((null != beginToken) && (null == endToken)) {
System.err.println("If you supply the beginToken then you need to specify the endToken");
return false;
}
if (null == endToken) {
System.err.println("You must specify the end token");
if ((null == beginToken) && (null != endToken)) {
System.err.println("If you supply the endToken then you need to specify the beginToken");
return false;
}
*/

return true;
}

private boolean parseArgs(String[] args) {
private boolean processConfigFile(String fname, Map<String, String> amap)
throws IOException, FileNotFoundException {
File cFile = new File(fname);
if (!cFile.isFile()) {
System.err.println("Configuration File must be a file");
return false;
}

BufferedReader cReader = new BufferedReader(new FileReader(cFile));
String line;
while ((line = cReader.readLine()) != null) {
String[] fields = line.trim().split("\\s+");
if (2 != fields.length) {
System.err.println("Bad line in config file: " + line);
return false;
}
if (null == amap.get(fields[0])) {
amap.put(fields[0], fields[1]);
}
}
return true;
}

private boolean parseArgs(String[] args)
throws IOException, FileNotFoundException {
String tkey;
if (args.length == 0) {
System.err.println("No arguments specified");
return false;
Expand All @@ -139,6 +222,10 @@ private boolean parseArgs(String[] args) {
amap.put(args[i], args[i+1]);
}

if (null != (tkey = amap.remove("-configFile")))
if (!processConfigFile(tkey, amap))
return false;

host = amap.remove("-host");
if (null == host) { // host is required
System.err.println("Must provide a host");
Expand All @@ -157,10 +244,14 @@ private boolean parseArgs(String[] args) {
return false;
}

String tkey;
if (null != (tkey = amap.remove("-port"))) port = Integer.parseInt(tkey);
if (null != (tkey = amap.remove("-user"))) username = tkey;
if (null != (tkey = amap.remove("-pw"))) password = tkey;
if (null != (tkey = amap.remove("-ssl-truststore-path"))) truststorePath = tkey;
if (null != (tkey = amap.remove("-ssl-truststore-pwd"))) truststorePwd = tkey;
if (null != (tkey = amap.remove("-ssl-keystore-path"))) keystorePath = tkey;
if (null != (tkey = amap.remove("-ssl-keystore-pwd"))) keystorePwd = tkey;
if (null != (tkey = amap.remove("-consistencyLevel"))) consistencyLevel = ConsistencyLevel.valueOf(tkey);
if (null != (tkey = amap.remove("-dateFormat"))) dateFormatString = tkey;
if (null != (tkey = amap.remove("-nullString"))) nullString = tkey;
if (null != (tkey = amap.remove("-delim"))) delimiter = tkey;
Expand All @@ -187,16 +278,55 @@ private boolean parseArgs(String[] args) {
return validateArgs();
}

private void setup() {
private SSLOptions createSSLContext()
throws KeyStoreException, FileNotFoundException, IOException, NoSuchAlgorithmException,
KeyManagementException, CertificateException, UnrecoverableKeyException {
TrustManagerFactory tmf = null;
KeyStore tks = KeyStore.getInstance("JKS");
tks.load((InputStream) new FileInputStream(new File(truststorePath)),
truststorePwd.toCharArray());
tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(tks);

KeyManagerFactory kmf = null;
if (null != keystorePath) {
KeyStore kks = KeyStore.getInstance("JKS");
kks.load((InputStream) new FileInputStream(new File(keystorePath)),
keystorePwd.toCharArray());
kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(kks, keystorePwd.toCharArray());
}

SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf != null? kmf.getKeyManagers() : null,
tmf != null ? tmf.getTrustManagers() : null,
new SecureRandom());

return new SSLOptions(sslContext, SSLOptions.DEFAULT_SSL_CIPHER_SUITES);
}

private void setup()
throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
CertificateException, UnrecoverableKeyException {
// Connect to Cassandra
PoolingOptions pOpts = new PoolingOptions();
pOpts.setCoreConnectionsPerHost(HostDistance.LOCAL, 4);
pOpts.setMaxConnectionsPerHost(HostDistance.LOCAL, 4);
Cluster.Builder clusterBuilder = Cluster.builder()
.addContactPoint(host)
.withPort(port)
.withPoolingOptions(pOpts)
.withLoadBalancingPolicy(new TokenAwarePolicy( new DCAwareRoundRobinPolicy()));
if (null != username)
clusterBuilder = clusterBuilder.withCredentials(username, password);
if (null != truststorePath)
clusterBuilder = clusterBuilder.withSSL(createSSLContext());

cluster = clusterBuilder.build();
session = cluster.newSession();
if (null == cluster) {
throw new IOException("Could not create cluster");
}
session = cluster.connect();
}

private void cleanup() {
Expand All @@ -206,9 +336,10 @@ private void cleanup() {
cluster.close();
}

public boolean run(String[] args) throws IOException, ParseException,
InterruptedException,
ExecutionException {
public boolean run(String[] args)
throws IOException, ParseException, InterruptedException, ExecutionException,
KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
CertificateException, UnrecoverableKeyException {
if (false == parseArgs(args)) {
System.err.println("Bad arguments");
System.err.println(usage());
Expand Down Expand Up @@ -240,7 +371,8 @@ public boolean run(String[] args) throws IOException, ParseException,
boolStyle, locale,
pstream,
beginToken,
endToken, session);
endToken, session,
consistencyLevel);
Future<Long> res = executor.submit(worker);
total = res.get();
executor.shutdown();
Expand Down Expand Up @@ -288,7 +420,8 @@ public boolean run(String[] args) throws IOException, ParseException,
boolStyle, locale,
pstream,
tBeginString,
tEndString, session);
tEndString, session,
consistencyLevel);
results.add(executor.submit(worker));
}
executor.shutdown();
Expand All @@ -303,7 +436,10 @@ public boolean run(String[] args) throws IOException, ParseException,
return true;
}

public static void main(String[] args) throws IOException, ParseException, InterruptedException, ExecutionException {
public static void main(String[] args)
throws IOException, ParseException, InterruptedException, ExecutionException,
KeyStoreException, NoSuchAlgorithmException, UnrecoverableKeyException,
CertificateException, KeyManagementException {
CqlDelimUnload cdu = new CqlDelimUnload();
boolean success = cdu.run(args);
if (success) {
Expand All @@ -315,6 +451,7 @@ public static void main(String[] args) throws IOException, ParseException, Inter

class ThreadExecute implements Callable<Long> {
private Session session;
private ConsistencyLevel consistencyLevel;
private PreparedStatement statement;
private CqlDelimParser cdp;

Expand All @@ -337,7 +474,7 @@ public ThreadExecute(String inCqlSchema, String inDelimiter,
Locale inLocale,
PrintStream inWriter,
String inBeginToken, String inEndToken,
Session inSession) {
Session inSession, ConsistencyLevel inConsistencyLevel) {
super();
cqlSchema = inCqlSchema;
delimiter = inDelimiter;
Expand All @@ -349,6 +486,7 @@ public ThreadExecute(String inCqlSchema, String inDelimiter,
endToken = inEndToken;
session = inSession;
writer = inWriter;
consistencyLevel = inConsistencyLevel;
}

public Long call() throws IOException, ParseException {
Expand Down Expand Up @@ -404,6 +542,7 @@ private void setup() throws IOException, ParseException {
+ endToken;
}
statement = session.prepare(select);
statement.setConsistencyLevel(consistencyLevel);
}

private void cleanup() throws IOException {
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/com/datastax/loader/parser/AbstractParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@

public abstract class AbstractParser implements Parser {
public abstract Object parse(String toparse) throws ParseException;
public abstract String format(Row row, int index) throws IndexOutOfBoundsException, InvalidTypeException;
public String format(Row row, int index) throws IndexOutOfBoundsException, InvalidTypeException {
if (row.isNull(index))
return null;
return format(row.getObject(index));
}
public abstract String format(Object o);

public Object parse(IndexedLine il, String nullString, Character delim,
Character escape, Character quote, boolean last)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public BigDecimal parse(String toparse) throws NumberFormatException {
return new BigDecimal(toparse);
}

public String format(Row row, int index) throws IndexOutOfBoundsException, InvalidTypeException {
return row.isNull(index) ? null : row.getDecimal(index).toString();
public String format(Object o) {
BigDecimal v = (BigDecimal)o;
return v.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public BigInteger parse(String toparse) throws NumberFormatException {
return new BigInteger(toparse);
}

public String format(Row row, int index) throws IndexOutOfBoundsException, InvalidTypeException {
return row.isNull(index) ? null : row.getVarint(index).toString();
public String format(Object o) {
BigInteger v = (BigInteger)o;
return v.toString();
}
}
Loading

0 comments on commit 989726e

Please sign in to comment.