Skip to content

Commit

Permalink
add new source code for post intro Topology
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown committed Jan 13, 2014
1 parent 44e6670 commit a562296
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 4 deletions.
27 changes: 27 additions & 0 deletions src/main/java/com/edi/storm/topos/ClusterRunningTopology.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.edi.storm.topos;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;

/**
* A sample topology that will only running in the cluster.
*
* @author Edison Xu
*
* Jan 13, 2014
*/
public class ClusterRunningTopology extends ExclaimBasicTopo {

public static void main(String[] args) throws Exception {

String topoName = "test";

ClusterRunningTopology topo = new ClusterRunningTopology();
Config conf = new Config();
conf.setDebug(false);

conf.setNumWorkers(3);

StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());
}
}
15 changes: 11 additions & 4 deletions src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

Expand All @@ -19,25 +20,31 @@
*/
public class ExclaimBasicTopo {

public static void main(String[] args) throws Exception {
protected StormTopology buildTopology()
{
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSpout());
builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
//builder.setBolt("exclaim", new ExclaimRichBolt(), 2).shuffleGrouping("spout");
builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");

return builder.createTopology();
}

public static void main(String[] args) throws Exception {

ExclaimBasicTopo topo = new ExclaimBasicTopo();
Config conf = new Config();
conf.setDebug(false);

if (args != null && args.length > 0) {
conf.setNumWorkers(3);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
StormSubmitter.submitTopology(args[0], conf, topo.buildTopology());
} else {

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
cluster.submitTopology("test", conf, topo.buildTopology());
Utils.sleep(100000);
cluster.killTopology("test");
cluster.shutdown();
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/edi/storm/topos/LocalRunningTopology.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.edi.storm.topos;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.utils.Utils;

/**
* A sample topology running only under Local cluster model.
*
* @author Edison Xu
*
* Jan 13, 2014
*/
public class LocalRunningTopology extends ExclaimBasicTopo {

public static void main(String[] args) throws Exception {

LocalRunningTopology topo = new LocalRunningTopology();
Config conf = new Config();
conf.setDebug(true);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, topo.buildTopology());
Utils.sleep(100000);
cluster.killTopology("test");
cluster.shutdown();
}
}
36 changes: 36 additions & 0 deletions src/main/java/com/edi/storm/topos/RemoteRunningTopology.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.edi.storm.topos;

import java.io.File;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;

import com.edi.storm.util.EJob;

/**
* @author Edison Xu
*
* Jan 13, 2014
*/
public class RemoteRunningTopology extends ExclaimBasicTopo {

public static void main(String[] args) throws Exception {

String topoName = "test";
RemoteRunningTopology topo = new RemoteRunningTopology();
Config conf = new Config();
conf.setDebug(false);

File jarFile = EJob.createTempJar(RemoteRunningTopology.class.getClassLoader().getResource("").getPath());
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);

//System.setProperty("storm.jar", Class.forName("com.edi.storm.topos.RemoteRunningTopology").getProtectionDomain().getCodeSource().getLocation().getPath());
System.setProperty("storm.jar", jarFile.toString());
conf.setNumWorkers(5);
conf.setDebug(false);
conf.put(Config.NIMBUS_HOST, "10.1.110.24");
//conf.put(Config.NIMBUS_THRIFT_PORT, 8889);
StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());
}
}
Loading

0 comments on commit a562296

Please sign in to comment.