From dd19ffdf610fb08036249697011a92a72e2f0442 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 30 Dec 2013 19:46:54 +0800 Subject: [PATCH] add new files --- .gitignore | 4 + pom.xml | 77 +++++ .../com/edi/storm/TransactionalDBWriter.java | 266 ++++++++++++++++++ .../com/edi/storm/bolts/ExclaimBasicBolt.java | 32 +++ .../com/edi/storm/bolts/ExclaimRichBolt.java | 39 +++ .../java/com/edi/storm/bolts/PrintBolt.java | 38 +++ .../com/edi/storm/spouts/RandomSpout.java | 47 ++++ .../com/edi/storm/topos/ExclaimBasicTopo.java | 44 +++ .../java/com/edi/storm/util/DBManager.java | 53 ++++ 9 files changed, 600 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/com/edi/storm/TransactionalDBWriter.java create mode 100644 src/main/java/com/edi/storm/bolts/ExclaimBasicBolt.java create mode 100644 src/main/java/com/edi/storm/bolts/ExclaimRichBolt.java create mode 100644 src/main/java/com/edi/storm/bolts/PrintBolt.java create mode 100644 src/main/java/com/edi/storm/spouts/RandomSpout.java create mode 100644 src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java create mode 100644 src/main/java/com/edi/storm/util/DBManager.java diff --git a/.gitignore b/.gitignore index 0f182a0..948270e 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,7 @@ *.jar *.war *.ear +*.classpath +*.project +*.settings/ +target/ \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..137dfcf --- /dev/null +++ b/pom.xml @@ -0,0 +1,77 @@ + + 4.0.0 + com.edi.storm + storm-samples + 0.0.1-SNAPSHOT + jar + + + UTF-8 + + + + + clojars.org + http://clojars.org/repo + + + + + storm-samples + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + ${project.build.sourceEncoding} + + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + + + storm + storm + 0.9.0-rc2 + provided + + + + c3p0 + c3p0 + 0.9.1.2 + compile + + + + mysql + mysql-connector-java + 5.1.26 + compile + + + \ No newline at end of file diff --git a/src/main/java/com/edi/storm/TransactionalDBWriter.java b/src/main/java/com/edi/storm/TransactionalDBWriter.java new file mode 100644 index 0000000..7f9c786 --- /dev/null +++ b/src/main/java/com/edi/storm/TransactionalDBWriter.java @@ -0,0 +1,266 @@ +/** + * + */ +package com.edi.storm; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import storm.trident.TridentTopology; +import storm.trident.operation.BaseFunction; +import storm.trident.operation.TridentCollector; +import storm.trident.spout.IBatchSpout; +import storm.trident.tuple.TridentTuple; +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +import com.edi.storm.util.DBManager; + +/** + * @author Edison Xu + * + * Dec 5, 2013 + */ +public class TransactionalDBWriter { + + public static void main(String[] args) throws Exception { + Config conf = new Config(); + conf.setMaxSpoutPending(20); + BatchSpout spout = new BatchSpout(1); + if (args.length == 0) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("trans-db-writer", conf, + buildTopology(spout)); + Thread.sleep(1000); + } else { + conf.setNumWorkers(3); + StormSubmitter.submitTopology(args[0], conf, buildTopology(spout)); + } + } + + static class BatchSpout implements IBatchSpout { + private static final Logger LOGGER = LoggerFactory.getLogger(BatchSpout.class); + private int batchSize; + private static final AtomicInteger seq = new AtomicInteger(0); + + public BatchSpout() { + super(); + this.batchSize = 1; + } + + public BatchSpout(int batchSize) { + super(); + this.batchSize = batchSize; + } + + public void open(Map conf, TopologyContext context) { + + } + + public void emitBatch(long batchId, TridentCollector collector) { + TransObj obj = new TransObj(seq.getAndIncrement(), System.currentTimeMillis()); + LOGGER.info("Fire!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + collector.emit(new Values(obj)); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void ack(long batchId) { + // TODO Auto-generated method stub + + } + + public void close() { + // TODO Auto-generated method stub + + } + + public Map getComponentConfiguration() { + // TODO Auto-generated method stub + return null; + } + + public Fields getOutputFields() { + return new Fields("obj"); + } + + } + + public static StormTopology buildTopology(BatchSpout spout) { + TridentTopology topology = new TridentTopology(); + + topology.newStream("transwrite", spout) + .each(new Fields("obj"), new WriteDB(), new Fields("after")) + ; + return topology.build(); + } + + static class TransObj implements Serializable{ + private static final long serialVersionUID = -2220707446968927825L; + private int seq; + private long initTime; + public int getSeq() { + return seq; + } + public void setSeq(int seq) { + this.seq = seq; + } + public long getInitTime() { + return initTime; + } + public void setInitTime(long initTime) { + this.initTime = initTime; + } + public TransObj(int seq, long initTime) { + super(); + this.seq = seq; + this.initTime = initTime; + } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (initTime ^ (initTime >>> 32)); + result = prime * result + seq; + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TransObj other = (TransObj) obj; + if (initTime != other.initTime) + return false; + if (seq != other.seq) + return false; + return true; + } + } + + private static class WriteDB extends BaseFunction { + private static List list = new ArrayList(); + private int MAX_SIZE = 10; + private static final String INSERT_STRING = "INSERT INTO `STORM_TRANS_TEST` (`seq`,`init_time`,`insert_time`,`latency`) values (%d,%d,%d,%d)"; + private static final String P_INSERT_STRING = "INSERT INTO `STORM_TRANS_TEST` (`seq`,`init_time`,`insert_time`,`latency`) values (?,?,?,?)"; + private boolean batch = false; + private static final Logger LOGGER = LoggerFactory.getLogger(WriteDB.class); + + public void execute(TridentTuple tuple, TridentCollector collector) { + LOGGER.info("Executed!!!!!!!!!!!!!!!!!!!!!!!!!"); + if(tuple==null) return; + TransObj obj = null; + long beginTime = System.currentTimeMillis(); + if(tuple.get(0)!=null && tuple.get(0) instanceof TransObj) + { + obj = (TransObj) tuple.get(0); + System.out.println("ExeLatency: " + (beginTime -obj.getInitTime())); + } + if(obj==null) return; + if(batch) + { + synchronized (list) { + if(list.size()>=MAX_SIZE) + { + insertData(); + list.clear(); + }else + { + /*String dml = String.format(INSERT_STRING, obj.getSeq(),obj.getInitTime(),System.currentTimeMillis()); + list.add(dml);*/ + list.add(obj); + } + } + }else + { + insertImmediately(obj); + } + + System.out.println("Latency2: " + (System.currentTimeMillis()-beginTime)); + System.out.println("TotalLatency: " + (System.currentTimeMillis()-obj.getInitTime())); + collector.emit(new Values(obj)); + } + + private void insertImmediately(TransObj obj) + { + Connection conn = null; + try { + conn = DBManager.getConnection(); + /*PreparedStatement prepStatement = conn.prepareStatement(P_INSERT_STRING); + prepStatement.setInt(1, obj.getSeq()); + prepStatement.setLong(2, obj.getInitTime()); + prepStatement.setLong(3, System.currentTimeMillis()); + prepStatement.setLong(4, (System.currentTimeMillis()-obj.getInitTime())); + prepStatement.addBatch(); + prepStatement.executeBatch();*/ + + String dml = String.format(INSERT_STRING, obj.getSeq(),obj.getInitTime(),System.currentTimeMillis(),(System.currentTimeMillis()-obj.getInitTime())); + Statement stmt = conn.createStatement(); + stmt.addBatch(dml); + stmt.executeBatch(); + System.out.println("============Insert OK=============="); + } catch (SQLException e1) { + e1.printStackTrace(); + }finally{ + if(conn!=null) + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + + private void insertData() + { + Connection conn = null; + try { + conn = DBManager.getConnection(); + PreparedStatement prepStatement = conn.prepareStatement(P_INSERT_STRING); + for(TransObj obj:list) + { + /*Statement stmt = conn.createStatement(); + stmt.addBatch(dml); + stmt.executeBatch();*/ + prepStatement.setInt(1, obj.getSeq()); + prepStatement.setLong(2, obj.getInitTime()); + prepStatement.setLong(3, System.currentTimeMillis()); + prepStatement.setLong(4, (System.currentTimeMillis()-obj.getInitTime())); + prepStatement.addBatch(); + } + prepStatement.executeBatch(); + System.out.println("============Insert OK=============="); + } catch (SQLException e1) { + e1.printStackTrace(); + }finally{ + if(conn!=null) + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + } +} diff --git a/src/main/java/com/edi/storm/bolts/ExclaimBasicBolt.java b/src/main/java/com/edi/storm/bolts/ExclaimBasicBolt.java new file mode 100644 index 0000000..aecc61b --- /dev/null +++ b/src/main/java/com/edi/storm/bolts/ExclaimBasicBolt.java @@ -0,0 +1,32 @@ +package com.edi.storm.bolts; + +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +/** + * + * A bolt that add "!" to the end of the sentence. + * @author Edison Xu + * + * Dec 30, 2013 + */ +public class ExclaimBasicBolt extends BaseBasicBolt { + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + //String sentence = tuple.getString(0); + String sentence = (String) tuple.getValue(0); + String out = sentence + "!"; + collector.emit(new Values(out)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("excl_sentence")); + } + +} diff --git a/src/main/java/com/edi/storm/bolts/ExclaimRichBolt.java b/src/main/java/com/edi/storm/bolts/ExclaimRichBolt.java new file mode 100644 index 0000000..257cc8c --- /dev/null +++ b/src/main/java/com/edi/storm/bolts/ExclaimRichBolt.java @@ -0,0 +1,39 @@ +package com.edi.storm.bolts; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +/** + * @author Edison Xu + * + * Dec 30, 2013 + */ +public class ExclaimRichBolt extends BaseRichBolt { + + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple tuple) { + this.collector.emit(tuple, new Values(tuple.getString(0)+"!")); + this.collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("after_excl")); + } + +} diff --git a/src/main/java/com/edi/storm/bolts/PrintBolt.java b/src/main/java/com/edi/storm/bolts/PrintBolt.java new file mode 100644 index 0000000..55e7384 --- /dev/null +++ b/src/main/java/com/edi/storm/bolts/PrintBolt.java @@ -0,0 +1,38 @@ +package com.edi.storm.bolts; + +import java.util.Map; + +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.tuple.Tuple; + +/** + * Print the String received + * + * @author Edison Xu + * + * Dec 30, 2013 + */ +public class PrintBolt extends BaseBasicBolt { + + private int indexId; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + this.indexId = context.getThisTaskIndex(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String rec = tuple.getString(0); + System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // do nothing + } + +} diff --git a/src/main/java/com/edi/storm/spouts/RandomSpout.java b/src/main/java/com/edi/storm/spouts/RandomSpout.java new file mode 100644 index 0000000..9a91d39 --- /dev/null +++ b/src/main/java/com/edi/storm/spouts/RandomSpout.java @@ -0,0 +1,47 @@ +package com.edi.storm.spouts; + +import java.util.Map; +import java.util.Random; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +/** + * + * A spout generates random stuff. + * + * @author Edison Xu + * + * Dec 30, 2013 + */ +public class RandomSpout extends BaseRichSpout { + + private SpoutOutputCollector collector; + + private Random rand; + + private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"}; + + @Override + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { + this.collector = collector; + this.rand = new Random(); + } + + @Override + public void nextTuple() { + String toSay = sentences[rand.nextInt(sentences.length)]; + this.collector.emit(new Values(toSay)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } + +} diff --git a/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java b/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java new file mode 100644 index 0000000..fd00902 --- /dev/null +++ b/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java @@ -0,0 +1,44 @@ +package com.edi.storm.topos; + + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; + +import com.edi.storm.bolts.ExclaimBasicBolt; +import com.edi.storm.bolts.PrintBolt; +import com.edi.storm.spouts.RandomSpout; + +/** + * @author Edison Xu + * + * Dec 30, 2013 + */ +public class ExclaimBasicTopo { + + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSpout()); + builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout"); + builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim"); + + Config conf = new Config(); + conf.setDebug(false); + + if (args != null && args.length > 0) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); + } else { + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + Utils.sleep(100000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } +} diff --git a/src/main/java/com/edi/storm/util/DBManager.java b/src/main/java/com/edi/storm/util/DBManager.java new file mode 100644 index 0000000..a4ec315 --- /dev/null +++ b/src/main/java/com/edi/storm/util/DBManager.java @@ -0,0 +1,53 @@ +/** + * + */ +package com.edi.storm.util; + +import java.beans.PropertyVetoException; +import java.sql.Connection; +import java.sql.SQLException; + +import com.mchange.v2.c3p0.ComboPooledDataSource; + +/** + * @author Edison Xu + * + * Dec 4, 2013 + */ +public class DBManager { + + private static final DBManager instance = new DBManager(); + + private static ComboPooledDataSource cpds = new ComboPooledDataSource(true); + + static { + cpds.setDataSourceName("mydatasource"); + cpds.setJdbcUrl("jdbc:mysql://10.1.110.21:3306/metadata?useUnicode=true&characterEncoding=utf8"); + try { + cpds.setDriverClass("com.mysql.jdbc.Driver"); + } catch (PropertyVetoException e) { + e.printStackTrace(); + } + cpds.setUser("root"); + cpds.setPassword("111111"); + cpds.setMaxPoolSize(10); + cpds.setMinPoolSize(2); + cpds.setAcquireIncrement(5); + cpds.setInitialPoolSize(5); + cpds.setMaxIdleTime(9); + } + + private DBManager(){} + public static DBManager getInstance(){ + return instance; + } + + public static Connection getConnection(){ + try { + return cpds.getConnection(); + } catch (SQLException e) { + e.printStackTrace(); + } + return null; + } +}