Skip to content

Commit

Permalink
add new files
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown committed Dec 30, 2013
1 parent a5ddd9d commit dd19ffd
Show file tree
Hide file tree
Showing 9 changed files with 600 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@
*.jar
*.war
*.ear
*.classpath
*.project
*.settings/
target/
77 changes: 77 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.edi.storm</groupId>
<artifactId>storm-samples</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

<build>
<finalName>storm-samples</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>

</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0-rc2</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.26</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
266 changes: 266 additions & 0 deletions src/main/java/com/edi/storm/TransactionalDBWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/**
*
*/
package com.edi.storm;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import storm.trident.tuple.TridentTuple;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.edi.storm.util.DBManager;

/**
* @author Edison Xu
*
* Dec 5, 2013
*/
public class TransactionalDBWriter {

public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
BatchSpout spout = new BatchSpout(1);
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trans-db-writer", conf,
buildTopology(spout));
Thread.sleep(1000);
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology(spout));
}
}

static class BatchSpout implements IBatchSpout {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchSpout.class);
private int batchSize;
private static final AtomicInteger seq = new AtomicInteger(0);

public BatchSpout() {
super();
this.batchSize = 1;
}

public BatchSpout(int batchSize) {
super();
this.batchSize = batchSize;
}

public void open(Map conf, TopologyContext context) {

}

public void emitBatch(long batchId, TridentCollector collector) {
TransObj obj = new TransObj(seq.getAndIncrement(), System.currentTimeMillis());
LOGGER.info("Fire!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
collector.emit(new Values(obj));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void ack(long batchId) {
// TODO Auto-generated method stub

}

public void close() {
// TODO Auto-generated method stub

}

public Map getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

public Fields getOutputFields() {
return new Fields("obj");
}

}

public static StormTopology buildTopology(BatchSpout spout) {
TridentTopology topology = new TridentTopology();

topology.newStream("transwrite", spout)
.each(new Fields("obj"), new WriteDB(), new Fields("after"))
;
return topology.build();
}

static class TransObj implements Serializable{
private static final long serialVersionUID = -2220707446968927825L;
private int seq;
private long initTime;
public int getSeq() {
return seq;
}
public void setSeq(int seq) {
this.seq = seq;
}
public long getInitTime() {
return initTime;
}
public void setInitTime(long initTime) {
this.initTime = initTime;
}
public TransObj(int seq, long initTime) {
super();
this.seq = seq;
this.initTime = initTime;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (initTime ^ (initTime >>> 32));
result = prime * result + seq;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TransObj other = (TransObj) obj;
if (initTime != other.initTime)
return false;
if (seq != other.seq)
return false;
return true;
}
}

private static class WriteDB extends BaseFunction {
private static List<TransObj> list = new ArrayList<TransObj>();
private int MAX_SIZE = 10;
private static final String INSERT_STRING = "INSERT INTO `STORM_TRANS_TEST` (`seq`,`init_time`,`insert_time`,`latency`) values (%d,%d,%d,%d)";
private static final String P_INSERT_STRING = "INSERT INTO `STORM_TRANS_TEST` (`seq`,`init_time`,`insert_time`,`latency`) values (?,?,?,?)";
private boolean batch = false;
private static final Logger LOGGER = LoggerFactory.getLogger(WriteDB.class);

public void execute(TridentTuple tuple, TridentCollector collector) {
LOGGER.info("Executed!!!!!!!!!!!!!!!!!!!!!!!!!");
if(tuple==null) return;
TransObj obj = null;
long beginTime = System.currentTimeMillis();
if(tuple.get(0)!=null && tuple.get(0) instanceof TransObj)
{
obj = (TransObj) tuple.get(0);
System.out.println("ExeLatency: " + (beginTime -obj.getInitTime()));
}
if(obj==null) return;
if(batch)
{
synchronized (list) {
if(list.size()>=MAX_SIZE)
{
insertData();
list.clear();
}else
{
/*String dml = String.format(INSERT_STRING, obj.getSeq(),obj.getInitTime(),System.currentTimeMillis());
list.add(dml);*/
list.add(obj);
}
}
}else
{
insertImmediately(obj);
}

System.out.println("Latency2: " + (System.currentTimeMillis()-beginTime));
System.out.println("TotalLatency: " + (System.currentTimeMillis()-obj.getInitTime()));
collector.emit(new Values(obj));
}

private void insertImmediately(TransObj obj)
{
Connection conn = null;
try {
conn = DBManager.getConnection();
/*PreparedStatement prepStatement = conn.prepareStatement(P_INSERT_STRING);
prepStatement.setInt(1, obj.getSeq());
prepStatement.setLong(2, obj.getInitTime());
prepStatement.setLong(3, System.currentTimeMillis());
prepStatement.setLong(4, (System.currentTimeMillis()-obj.getInitTime()));
prepStatement.addBatch();
prepStatement.executeBatch();*/

String dml = String.format(INSERT_STRING, obj.getSeq(),obj.getInitTime(),System.currentTimeMillis(),(System.currentTimeMillis()-obj.getInitTime()));
Statement stmt = conn.createStatement();
stmt.addBatch(dml);
stmt.executeBatch();
System.out.println("============Insert OK==============");
} catch (SQLException e1) {
e1.printStackTrace();
}finally{
if(conn!=null)
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

private void insertData()
{
Connection conn = null;
try {
conn = DBManager.getConnection();
PreparedStatement prepStatement = conn.prepareStatement(P_INSERT_STRING);
for(TransObj obj:list)
{
/*Statement stmt = conn.createStatement();
stmt.addBatch(dml);
stmt.executeBatch();*/
prepStatement.setInt(1, obj.getSeq());
prepStatement.setLong(2, obj.getInitTime());
prepStatement.setLong(3, System.currentTimeMillis());
prepStatement.setLong(4, (System.currentTimeMillis()-obj.getInitTime()));
prepStatement.addBatch();
}
prepStatement.executeBatch();
System.out.println("============Insert OK==============");
} catch (SQLException e1) {
e1.printStackTrace();
}finally{
if(conn!=null)
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
32 changes: 32 additions & 0 deletions src/main/java/com/edi/storm/bolts/ExclaimBasicBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.edi.storm.bolts;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
*
* A bolt that add "!" to the end of the sentence.
* @author Edison Xu
*
* Dec 30, 2013
*/
public class ExclaimBasicBolt extends BaseBasicBolt {

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//String sentence = tuple.getString(0);
String sentence = (String) tuple.getValue(0);
String out = sentence + "!";
collector.emit(new Values(out));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("excl_sentence"));
}

}
Loading

0 comments on commit dd19ffd

Please sign in to comment.