From 44e6670c0ce54e0f02d9487c5f9bb208753d3b31 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 2 Jan 2014 16:30:41 +0800 Subject: [PATCH] update print --- .../java/com/edi/storm/bolts/PrintBolt.java | 4 ++- .../com/edi/storm/spouts/RandomSpout.java | 26 ++++++++++++++++++- .../com/edi/storm/topos/ExclaimBasicTopo.java | 2 ++ .../java/com/edi/storm/util/PrintHelper.java | 19 ++++++++++++++ 4 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/edi/storm/util/PrintHelper.java diff --git a/src/main/java/com/edi/storm/bolts/PrintBolt.java b/src/main/java/com/edi/storm/bolts/PrintBolt.java index 55e7384..11ae995 100644 --- a/src/main/java/com/edi/storm/bolts/PrintBolt.java +++ b/src/main/java/com/edi/storm/bolts/PrintBolt.java @@ -2,6 +2,8 @@ import java.util.Map; +import com.edi.storm.util.PrintHelper; + import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; @@ -27,7 +29,7 @@ public void prepare(Map stormConf, TopologyContext context) { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String rec = tuple.getString(0); - System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec)); + PrintHelper.print(String.format("Bolt[%d] String recieved: %s",this.indexId, rec)); } @Override diff --git a/src/main/java/com/edi/storm/spouts/RandomSpout.java b/src/main/java/com/edi/storm/spouts/RandomSpout.java index 9a91d39..999c124 100644 --- a/src/main/java/com/edi/storm/spouts/RandomSpout.java +++ b/src/main/java/com/edi/storm/spouts/RandomSpout.java @@ -1,7 +1,12 @@ package com.edi.storm.spouts; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import com.edi.storm.util.PrintHelper; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; @@ -9,6 +14,7 @@ import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; /** * @@ -24,6 +30,8 @@ public class RandomSpout extends BaseRichSpout { private Random rand; + private AtomicInteger counter; + private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"}; @Override @@ -31,17 +39,33 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.rand = new Random(); + counter = new AtomicInteger(); } @Override public void nextTuple() { + Utils.sleep(5000); String toSay = sentences[rand.nextInt(sentences.length)]; - this.collector.emit(new Values(toSay)); + int msgId = this.counter.getAndIncrement(); + toSay = "["+ msgId + "]"+ toSay; + PrintHelper.print("Send " + toSay ); + + this.collector.emit(new Values(toSay), msgId); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } + + @Override + public void ack(Object msgId) { + PrintHelper.print("ack " + msgId); + } + + @Override + public void fail(Object msgId) { + PrintHelper.print("fail " + msgId); + } } diff --git a/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java b/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java index fd00902..6e61acd 100644 --- a/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java +++ b/src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java @@ -8,6 +8,7 @@ import backtype.storm.utils.Utils; import com.edi.storm.bolts.ExclaimBasicBolt; +import com.edi.storm.bolts.ExclaimRichBolt; import com.edi.storm.bolts.PrintBolt; import com.edi.storm.spouts.RandomSpout; @@ -23,6 +24,7 @@ public static void main(String[] args) throws Exception { builder.setSpout("spout", new RandomSpout()); builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout"); + //builder.setBolt("exclaim", new ExclaimRichBolt(), 2).shuffleGrouping("spout"); builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim"); Config conf = new Config(); diff --git a/src/main/java/com/edi/storm/util/PrintHelper.java b/src/main/java/com/edi/storm/util/PrintHelper.java new file mode 100644 index 0000000..8ffdb59 --- /dev/null +++ b/src/main/java/com/edi/storm/util/PrintHelper.java @@ -0,0 +1,19 @@ +package com.edi.storm.util; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * @author Edison Xu + * + * Jan 2, 2014 + */ +public class PrintHelper { + + private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS"); + + public static void print(String out){ + System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out); + } + +}