Skip to content

Commit

Permalink
update print
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown committed Jan 2, 2014
1 parent dd19ffd commit 44e6670
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/main/java/com/edi/storm/bolts/PrintBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.util.Map;

import com.edi.storm.util.PrintHelper;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
Expand All @@ -27,7 +29,7 @@ public void prepare(Map stormConf, TopologyContext context) {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String rec = tuple.getString(0);
System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec));
PrintHelper.print(String.format("Bolt[%d] String recieved: %s",this.indexId, rec));
}

@Override
Expand Down
26 changes: 25 additions & 1 deletion src/main/java/com/edi/storm/spouts/RandomSpout.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package com.edi.storm.spouts;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import com.edi.storm.util.PrintHelper;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
*
Expand All @@ -24,24 +30,42 @@ public class RandomSpout extends BaseRichSpout {

private Random rand;

private AtomicInteger counter;

private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.rand = new Random();
counter = new AtomicInteger();
}

@Override
public void nextTuple() {
Utils.sleep(5000);
String toSay = sentences[rand.nextInt(sentences.length)];
this.collector.emit(new Values(toSay));
int msgId = this.counter.getAndIncrement();
toSay = "["+ msgId + "]"+ toSay;
PrintHelper.print("Send " + toSay );

this.collector.emit(new Values(toSay), msgId);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}

@Override
public void ack(Object msgId) {
PrintHelper.print("ack " + msgId);
}

@Override
public void fail(Object msgId) {
PrintHelper.print("fail " + msgId);
}

}
2 changes: 2 additions & 0 deletions src/main/java/com/edi/storm/topos/ExclaimBasicTopo.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import backtype.storm.utils.Utils;

import com.edi.storm.bolts.ExclaimBasicBolt;
import com.edi.storm.bolts.ExclaimRichBolt;
import com.edi.storm.bolts.PrintBolt;
import com.edi.storm.spouts.RandomSpout;

Expand All @@ -23,6 +24,7 @@ public static void main(String[] args) throws Exception {

builder.setSpout("spout", new RandomSpout());
builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
//builder.setBolt("exclaim", new ExclaimRichBolt(), 2).shuffleGrouping("spout");
builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");

Config conf = new Config();
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/edi/storm/util/PrintHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.edi.storm.util;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* @author Edison Xu
*
* Jan 2, 2014
*/
public class PrintHelper {

private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS");

public static void print(String out){
System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out);
}

}

0 comments on commit 44e6670

Please sign in to comment.