1、storm拓扑应用开发步骤

storm拓扑的开发,主要有三个步骤:

  • 选择合适的spout

    storm提供的DRPCSpout用于同步请求场景,KestrelThriftSpout用于异步请求的场景,这两者已经能满足大多数需求,根据应用需要选择即可。

  • 开发bolt

    bolt开发最主要的工作是实现execute()回调函数,由bolt收到数据时调用。execute函数里实现的是具体的业务逻辑,获取输入数据、进行处理、输出新的数据

  • 开发topology,组合spout和bolt

    调用TopologyBuilder类的setSpout和setBolt方法,将spout和bolt组合成我们的应用。

2、storm 应用代码示例

下面是一个同步计算输入字符串的MD5值的storm应用,注释中详细介绍了各段代码的含义和用途。

public class Md5Topology {
   // 自定义的bolt继承BaseBasicBolt类
   public static class Md5Bolt extends BaseBasicBolt {
       @Override
       public void execute(Tuple tuple, BasicOutputCollector collector) { // tuple是输入,collector用于输出
           // 获取tuple的第一个字段
           String input = tuple.getString(0);                                              

           // 业务逻辑处理,这里就是简单的计算md5值
           String output = Md5Util.getMD5Str(input);                               

           // 调用emit接口输出,输出是2个字段,分别是输出的md5值、输入的第二个字段
           collector.emit(new Values(output, tuple.getString(1)));              
       }

       @Override
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
           // 声明这个bolt的输出有两个字段,字段名分别是result和return-info
           declarer.declare(new Fields("result", "return-info"));                  
       }
   }

   public static void main(String[] args) throws Exception {
       TopologyBuilder builder = new TopologyBuilder();

       // 1. 设置spout,3个参数分别是:spout名字,spout对象,并发数
       builder.setSpout("DRPCSpout", new DRPCSpoutNew(args[0]), Integer.parseInt(args[1]));

       // 2. 开发bolt,3个参数分别是:bolt名字,bolt对象,并发数
       builder.setBolt("Md5Bolt", new Md5Bolt(), Integer.parseInt(args[2]))
                      .shuffleGrouping("DRPCSpout");  //指定上游为"DRPCSpout"

       // 3. 复用已有bolt,参数同上。ReturnResults是一个公共bolt,仅在同步场景中使用,负责将结果返回给drpc
       builder.setBolt("ReturnBolt", new ReturnResults(), Integer.parseInt(args[3]))
                      .shuffleGrouping("Md5Bolt");   //指定上游为"Md5Bolt"

       Config conf = new Config();

       // 4. 提交topology
       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
   }
}

3、storm应用目录结构

上述storm应用的源码结构如下:

strom_md5/
|-- build						# 编译工具自动生成、删除的目录
|-- lib							# 可选,依赖的jar包存放目录
|-- src							# 源码目录
|   |-- java 					# java类文件目录
|   |   |-- Md5Topology.java
|   |   `-- Md5Util.java
|   `-- resources				# 资源文件目录
`-- storm_md5.jar				# 打包后的jar包

示例源码包下载

4、运行storm应用

storm应用的运行分为本地模式和分布式模式两种模式。本地模式用于开发和测试,分布式模式是将topology提交到storm集群上执行。

要在分布式集群上运行,将示例代码打成jar包,然后用如下命令启动和停止

  • 启动topology

      # storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
      storm jar ./storm_md5.jar Md5Topology Md5Topology
    
  • 停止topology

      #storm kill 【拓扑名称】
      storm kill Md5Topology
    

如需要本地模式运行,可将上述示例代码中的以下部分:

// 4. 提交topology
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

换成下面的代码:

// 4. 本地模式运行topology
conf.setDebug(true); 
conf.setNumWorkers(2); 
LocalCluster cluster = new LocalCluster(); 
cluster.submitTopology(“test”, conf, builder.createTopology()); 
Utils.sleep(10000); 
cluster.killTopology(“test”); 
cluster.shutdown(); 

直接运行即可。

5、使用其他语言开发storm应用

storm集成了ShellBolt,用来支持使用其他语言开发storm应用。其他语言开发的bolt会被当做子进程来执行,storm通过stdin/stdout传递json格式的消息来和这些子进程通信。

更多有关ShellBolt的内容,请阅读官方文档:Using non-JVM languages with Storm.

本文链接:http://tabalt.net/blog/develop-and-run-storm-topology/,转载请注明。