博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm之详解spout、blot
阅读量:6034 次
发布时间:2019-06-20

本文共 2391 字,大约阅读时间需要 7 分钟。

1、Topology的构造

backtype.storm.topology.TopologyBuilder
2、Spout组件的编写
实现接口 backtype.storm.topology.IRichSpout;
或者继承backtype.storm.topology.base.BaseRichSpout;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub

}

open 方法,是spout的组件初始化方法,而且Spout实例创建后首先被调用,只调用一次

@Override
public void close() {
// 对于资源的释放关闭,可以在该方法中实现
}

 

@Override

public void nextTuple() {
// 实现如何从数据源上获取数据的逻辑
// 以及向后面的组件bolt发射数据
}

nextTuple 循环调用

@Override
public void ack(Object msgId) {

}

Topology启用了消息可靠性保障机制,当某个Tuple在Topology上处理成功后,调用ack方法执行一些消息处理成功后该干的事情

 

@Override

public void fail(Object msgId) {
// Topology启用了消息可靠性保障机制,某个Tuple在后面处理失败,该干什么

// 比如重试,重试达到最大可重试(比如三次)就丢弃

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明向后面组件发射的Tuple keys依次是什么

}

 

@Override

public Map<String, Object> getComponentConfiguration() {
// 设置该组件Spout一些专用的参数
return null;
}
kafkaSpout 向后发射的Tuple {"str":"msg"}

 

注意点:

Topology中使用的一些类,最好都要实现序列化接口 java.io.Serializable
3、Bolt组件
实现backtype.storm.topology.IRichBolt
或者继承backtype.storm.topology.base.BaseRichBolt

 

@Override

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//类似于spout中open方法

}

 

SpoutOutputCollector spout组件中tuple的发射器

OutputCollector bolt组件中tuple发射器

 

@Override

public void execute(Tuple input) {
// TODO Auto-generated method stub

}

execute 类似于Spout的nextTuple方法

@Override
public void cleanup() {
// TODO Auto-generated method stub

}

类似于spout中close方法

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明向后面组件发射的Tuple keys依次是什么

}

 

@Override

public Map<String, Object> getComponentConfiguration() {
// 设置该组件Spout一些专用的参数
return null;
}
4、数据流分组 方式
shuffleGrouping 随机分配
fieldsGrouping 根据key分组进行分配
globalGrouping 全局分组 只会将tuple往后面组件中固定一个上发送

 

5、消息可靠性保障机制

启用消息可靠性保障机制:ack、fail

 

 

Spout端:

1)发射器发射tuple时,需要指定一个msgID
collector.emit(new Values(sentence),mssageId );
2)使用缓存所发射的tuple,Map key=msgID,value = Values

private Map<Object,Values> tuples;

3)ack方法
// 确认发射成功,将tuple从缓存中移除
tuples.remove(msgId);

4)fail方法
重试
// 重试
Values values = tuples.get(msgId);
// 重新发射
collector.emit(values,msgId );

Bolt端:
1)如果bolt端继续往后面组件发射,需要锚定前面的tuple
// 启用消息可靠性保障机制,需要锚定接收到tuple
collector.emit(input,new Values(word));
2)处理完tuple后
// 确认处理结束
collector.ack(input);

try{

}catch{
// 处理失败
collector.fail(input);
}

转载于:https://www.cnblogs.com/WardSea/p/7366163.html

你可能感兴趣的文章
三分钟了解实时流式大数据分析
查看>>
留与后人一段面试的总结
查看>>
Spring基于XML方式配置事务
查看>>
T-MBA学习营 | 寒窗十数载,我们原来并不会学习?
查看>>
log4j.properties模板
查看>>
Linux:信号(上)
查看>>
vmware虚拟化无法迁移虚拟机
查看>>
SQL UPDATE实现多表更新
查看>>
最近有个需求,就是把某个网址跳转到另外一个网址
查看>>
innobackupex 在增量的基础上增量备份
查看>>
Windows Server 2012 R2 DirectAccess功能测试(2)App1服务器安装及配置
查看>>
基于清单的启动器的实现
查看>>
外网用户通过citrix打印慢的解决方法
查看>>
STL容器的使用
查看>>
关于std::map
查看>>
JXL导出Excel文件兼容性问题
查看>>
VBoot1.0发布,Vue & SpringBoot 综合开发入门
查看>>
centos7 安装wps 后 演示无法启动
查看>>
git简单命令
查看>>
LAMP编译部署
查看>>