1、Topology的构造
backtype.storm.topology.TopologyBuilder 2、Spout组件的编写实现接口 backtype.storm.topology.IRichSpout; 或者继承backtype.storm.topology.base.BaseRichSpout; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub}
open 方法,是spout的组件初始化方法,而且Spout实例创建后首先被调用,只调用一次 @Overridepublic void close() { // 对于资源的释放关闭,可以在该方法中实现}
@Override
public void nextTuple() { // 实现如何从数据源上获取数据的逻辑 // 以及向后面的组件bolt发射数据}nextTuple 循环调用
@Overridepublic void ack(Object msgId) {}
Topology启用了消息可靠性保障机制,当某个Tuple在Topology上处理成功后,调用ack方法执行一些消息处理成功后该干的事情
@Override
public void fail(Object msgId) { // Topology启用了消息可靠性保障机制,某个Tuple在后面处理失败,该干什么// 比如重试,重试达到最大可重试(比如三次)就丢弃
} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 声明向后面组件发射的Tuple keys依次是什么}
@Override
public Map<String, Object> getComponentConfiguration() { // 设置该组件Spout一些专用的参数 return null; }kafkaSpout 向后发射的Tuple {"str":"msg"}
注意点:
Topology中使用的一些类,最好都要实现序列化接口 java.io.Serializable3、Bolt组件实现backtype.storm.topology.IRichBolt 或者继承backtype.storm.topology.base.BaseRichBolt
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { //类似于spout中open方法}
SpoutOutputCollector spout组件中tuple的发射器
OutputCollector bolt组件中tuple发射器
@Override
public void execute(Tuple input) { // TODO Auto-generated method stub}
execute 类似于Spout的nextTuple方法 @Override public void cleanup() { // TODO Auto-generated method stub}
类似于spout中close方法
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { // 声明向后面组件发射的Tuple keys依次是什么}
@Override
public Map<String, Object> getComponentConfiguration() { // 设置该组件Spout一些专用的参数 return null; } 4、数据流分组 方式shuffleGrouping 随机分配fieldsGrouping 根据key分组进行分配globalGrouping 全局分组 只会将tuple往后面组件中固定一个上发送
5、消息可靠性保障机制
启用消息可靠性保障机制:ack、fail
Spout端:
1)发射器发射tuple时,需要指定一个msgID collector.emit(new Values(sentence),mssageId ); 2)使用缓存所发射的tuple,Map key=msgID,value = Valuesprivate Map<Object,Values> tuples;
3)ack方法 // 确认发射成功,将tuple从缓存中移除 tuples.remove(msgId); 4)fail方法 重试 // 重试 Values values = tuples.get(msgId); // 重新发射 collector.emit(values,msgId ); Bolt端: 1)如果bolt端继续往后面组件发射,需要锚定前面的tuple // 启用消息可靠性保障机制,需要锚定接收到tuple collector.emit(input,new Values(word)); 2)处理完tuple后 // 确认处理结束 collector.ack(input);try{
}catch{ // 处理失败 collector.fail(input); }