diff --git a/.gitignore b/.gitignore
index 0f182a0..948270e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,7 @@
*.jar
*.war
*.ear
+*.classpath
+*.project
+*.settings/
+target/
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..137dfcf
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,77 @@
+
+ 4.0.0
+ com.edi.storm
+ storm-samples
+ 0.0.1-SNAPSHOT
+ jar
+
+
+ UTF-8
+
+
+
+
+ clojars.org
+ http://clojars.org/repo
+
+
+
+
+ storm-samples
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+
+ 1.7
+ ${project.build.sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+
+
+ storm
+ storm
+ 0.9.0-rc2
+ provided
+
+
+
+ c3p0
+ c3p0
+ 0.9.1.2
+ compile
+
+
+
+ mysql
+ mysql-connector-java
+ 5.1.26
+ compile
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/edi/storm/TransactionalDBWriter.java b/src/main/java/com/edi/storm/TransactionalDBWriter.java
new file mode 100644
index 0000000..7f9c786
--- /dev/null
+++ b/src/main/java/com/edi/storm/TransactionalDBWriter.java
@@ -0,0 +1,266 @@
+/**
+ *
+ */
+package com.edi.storm;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import storm.trident.TridentTopology;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+import storm.trident.tuple.TridentTuple;
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import com.edi.storm.util.DBManager;
+
+/**
+ * @author Edison Xu
+ *
+ * Dec 5, 2013
+ */
+public class TransactionalDBWriter {
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+ BatchSpout spout = new BatchSpout(1);
+ if (args.length == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("trans-db-writer", conf,
+ buildTopology(spout));
+ Thread.sleep(1000);
+ } else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[0], conf, buildTopology(spout));
+ }
+ }
+
+ static class BatchSpout implements IBatchSpout {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BatchSpout.class);
+ private int batchSize;
+ private static final AtomicInteger seq = new AtomicInteger(0);
+
+ public BatchSpout() {
+ super();
+ this.batchSize = 1;
+ }
+
+ public BatchSpout(int batchSize) {
+ super();
+ this.batchSize = batchSize;
+ }
+
+ public void open(Map conf, TopologyContext context) {
+
+ }
+
+ public void emitBatch(long batchId, TridentCollector collector) {
+ TransObj obj = new TransObj(seq.getAndIncrement(), System.currentTimeMillis());
+ LOGGER.info("Fire!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
+ collector.emit(new Values(obj));
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void ack(long batchId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public Map getComponentConfiguration() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Fields getOutputFields() {
+ return new Fields("obj");
+ }
+
+ }
+
+ public static StormTopology buildTopology(BatchSpout spout) {
+ TridentTopology topology = new TridentTopology();
+
+ topology.newStream("transwrite", spout)
+ .each(new Fields("obj"), new WriteDB(), new Fields("after"))
+ ;
+ return topology.build();
+ }
+
+ static class TransObj implements Serializable{
+ private static final long serialVersionUID = -2220707446968927825L;
+ private int seq;
+ private long initTime;
+ public int getSeq() {
+ return seq;
+ }
+ public void setSeq(int seq) {
+ this.seq = seq;
+ }
+ public long getInitTime() {
+ return initTime;
+ }
+ public void setInitTime(long initTime) {
+ this.initTime = initTime;
+ }
+ public TransObj(int seq, long initTime) {
+ super();
+ this.seq = seq;
+ this.initTime = initTime;
+ }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (initTime ^ (initTime >>> 32));
+ result = prime * result + seq;
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TransObj other = (TransObj) obj;
+ if (initTime != other.initTime)
+ return false;
+ if (seq != other.seq)
+ return false;
+ return true;
+ }
+ }
+
+ private static class WriteDB extends BaseFunction {
+ private static List list = new ArrayList();
+ private int MAX_SIZE = 10;
+ private static final String INSERT_STRING = "INSERT INTO `STORM_TRANS_TEST` (`seq`,`init_time`,`insert_time`,`latency`) values (%d,%d,%d,%d)";
+ private static final String P_INSERT_STRING = "INSERT INTO `STORM_TRANS_TEST` (`seq`,`init_time`,`insert_time`,`latency`) values (?,?,?,?)";
+ private boolean batch = false;
+ private static final Logger LOGGER = LoggerFactory.getLogger(WriteDB.class);
+
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ LOGGER.info("Executed!!!!!!!!!!!!!!!!!!!!!!!!!");
+ if(tuple==null) return;
+ TransObj obj = null;
+ long beginTime = System.currentTimeMillis();
+ if(tuple.get(0)!=null && tuple.get(0) instanceof TransObj)
+ {
+ obj = (TransObj) tuple.get(0);
+ System.out.println("ExeLatency: " + (beginTime -obj.getInitTime()));
+ }
+ if(obj==null) return;
+ if(batch)
+ {
+ synchronized (list) {
+ if(list.size()>=MAX_SIZE)
+ {
+ insertData();
+ list.clear();
+ }else
+ {
+ /*String dml = String.format(INSERT_STRING, obj.getSeq(),obj.getInitTime(),System.currentTimeMillis());
+ list.add(dml);*/
+ list.add(obj);
+ }
+ }
+ }else
+ {
+ insertImmediately(obj);
+ }
+
+ System.out.println("Latency2: " + (System.currentTimeMillis()-beginTime));
+ System.out.println("TotalLatency: " + (System.currentTimeMillis()-obj.getInitTime()));
+ collector.emit(new Values(obj));
+ }
+
+ private void insertImmediately(TransObj obj)
+ {
+ Connection conn = null;
+ try {
+ conn = DBManager.getConnection();
+ /*PreparedStatement prepStatement = conn.prepareStatement(P_INSERT_STRING);
+ prepStatement.setInt(1, obj.getSeq());
+ prepStatement.setLong(2, obj.getInitTime());
+ prepStatement.setLong(3, System.currentTimeMillis());
+ prepStatement.setLong(4, (System.currentTimeMillis()-obj.getInitTime()));
+ prepStatement.addBatch();
+ prepStatement.executeBatch();*/
+
+ String dml = String.format(INSERT_STRING, obj.getSeq(),obj.getInitTime(),System.currentTimeMillis(),(System.currentTimeMillis()-obj.getInitTime()));
+ Statement stmt = conn.createStatement();
+ stmt.addBatch(dml);
+ stmt.executeBatch();
+ System.out.println("============Insert OK==============");
+ } catch (SQLException e1) {
+ e1.printStackTrace();
+ }finally{
+ if(conn!=null)
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void insertData()
+ {
+ Connection conn = null;
+ try {
+ conn = DBManager.getConnection();
+ PreparedStatement prepStatement = conn.prepareStatement(P_INSERT_STRING);
+ for(TransObj obj:list)
+ {
+ /*Statement stmt = conn.createStatement();
+ stmt.addBatch(dml);
+ stmt.executeBatch();*/
+ prepStatement.setInt(1, obj.getSeq());
+ prepStatement.setLong(2, obj.getInitTime());
+ prepStatement.setLong(3, System.currentTimeMillis());
+ prepStatement.setLong(4, (System.currentTimeMillis()-obj.getInitTime()));
+ prepStatement.addBatch();
+ }
+ prepStatement.executeBatch();
+ System.out.println("============Insert OK==============");
+ } catch (SQLException e1) {
+ e1.printStackTrace();
+ }finally{
+ if(conn!=null)
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/edi/storm/bolts/ExclaimBasicBolt.java b/src/main/java/com/edi/storm/bolts/ExclaimBasicBolt.java
new file mode 100644
index 0000000..aecc61b
--- /dev/null
+++ b/src/main/java/com/edi/storm/bolts/ExclaimBasicBolt.java
@@ -0,0 +1,32 @@
+package com.edi.storm.bolts;
+
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+/**
+ *
+ * A bolt that add "!" to the end of the sentence.
+ * @author Edison Xu
+ *
+ * Dec 30, 2013
+ */
+public class ExclaimBasicBolt extends BaseBasicBolt {
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ //String sentence = tuple.getString(0);
+ String sentence = (String) tuple.getValue(0);
+ String out = sentence + "!";
+ collector.emit(new Values(out));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("excl_sentence"));
+ }
+
+}
diff --git a/src/main/java/com/edi/storm/bolts/ExclaimRichBolt.java b/src/main/java/com/edi/storm/bolts/ExclaimRichBolt.java
new file mode 100644
index 0000000..257cc8c
--- /dev/null
+++ b/src/main/java/com/edi/storm/bolts/ExclaimRichBolt.java
@@ -0,0 +1,39 @@
+package com.edi.storm.bolts;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+/**
+ * @author Edison Xu
+ *
+ * Dec 30, 2013
+ */
+public class ExclaimRichBolt extends BaseRichBolt {
+
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
+ OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ this.collector.emit(tuple, new Values(tuple.getString(0)+"!"));
+ this.collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("after_excl"));
+ }
+
+}
diff --git a/src/main/java/com/edi/storm/bolts/PrintBolt.java b/src/main/java/com/edi/storm/bolts/PrintBolt.java
new file mode 100644
index 0000000..55e7384
--- /dev/null
+++ b/src/main/java/com/edi/storm/bolts/PrintBolt.java
@@ -0,0 +1,38 @@
+package com.edi.storm.bolts;
+
+import java.util.Map;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Print the String received
+ *
+ * @author Edison Xu
+ *
+ * Dec 30, 2013
+ */
+public class PrintBolt extends BaseBasicBolt {
+
+ private int indexId;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ this.indexId = context.getThisTaskIndex();
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String rec = tuple.getString(0);
+ System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // do nothing
+ }
+
+}
diff --git a/src/main/java/com/edi/storm/spouts/RandomSpout.java b/src/main/java/com/edi/storm/spouts/RandomSpout.java
new file mode 100644
index 0000000..9a91d39
--- /dev/null
+++ b/src/main/java/com/edi/storm/spouts/RandomSpout.java
@@ -0,0 +1,47 @@
+package com.edi.storm.spouts;
+
+import java.util.Map;
+import java.util.Random;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+/**
+ *
+ * A spout generates random stuff.
+ *
+ * @author Edison Xu
+ *
+ * Dec 30, 2013
+ */
+public class RandomSpout extends BaseRichSpout {
+
+ private SpoutOutputCollector collector;
+
+ private Random rand;
+
+ private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
+
+ @Override
+ public void open(Map conf, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ this.rand = new Random();
+ }
+
+ @Override
+ public void nextTuple() {
+ String toSay = sentences[rand.nextInt(sentences.length)];
+ this.collector.emit(new Values(toSay));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence"));
+ }
+
+}
diff --git a/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java b/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java
new file mode 100644
index 0000000..fd00902
--- /dev/null
+++ b/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java
@@ -0,0 +1,44 @@
+package com.edi.storm.topos;
+
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+
+import com.edi.storm.bolts.ExclaimBasicBolt;
+import com.edi.storm.bolts.PrintBolt;
+import com.edi.storm.spouts.RandomSpout;
+
+/**
+ * @author Edison Xu
+ *
+ * Dec 30, 2013
+ */
+public class ExclaimBasicTopo {
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout", new RandomSpout());
+ builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
+ builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");
+
+ Config conf = new Config();
+ conf.setDebug(false);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
+ } else {
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(100000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ }
+ }
+}
diff --git a/src/main/java/com/edi/storm/util/DBManager.java b/src/main/java/com/edi/storm/util/DBManager.java
new file mode 100644
index 0000000..a4ec315
--- /dev/null
+++ b/src/main/java/com/edi/storm/util/DBManager.java
@@ -0,0 +1,53 @@
+/**
+ *
+ */
+package com.edi.storm.util;
+
+import java.beans.PropertyVetoException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import com.mchange.v2.c3p0.ComboPooledDataSource;
+
+/**
+ * @author Edison Xu
+ *
+ * Dec 4, 2013
+ */
+public class DBManager {
+
+ private static final DBManager instance = new DBManager();
+
+ private static ComboPooledDataSource cpds = new ComboPooledDataSource(true);
+
+ static {
+ cpds.setDataSourceName("mydatasource");
+ cpds.setJdbcUrl("jdbc:mysql://10.1.110.21:3306/metadata?useUnicode=true&characterEncoding=utf8");
+ try {
+ cpds.setDriverClass("com.mysql.jdbc.Driver");
+ } catch (PropertyVetoException e) {
+ e.printStackTrace();
+ }
+ cpds.setUser("root");
+ cpds.setPassword("111111");
+ cpds.setMaxPoolSize(10);
+ cpds.setMinPoolSize(2);
+ cpds.setAcquireIncrement(5);
+ cpds.setInitialPoolSize(5);
+ cpds.setMaxIdleTime(9);
+ }
+
+ private DBManager(){}
+ public static DBManager getInstance(){
+ return instance;
+ }
+
+ public static Connection getConnection(){
+ try {
+ return cpds.getConnection();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+}