搜索
您的当前位置:首页正文

storm drpc

来源:二三娱乐

客户端:用来发起DRPC的调用

DRPC Server:实现与客户端的对接,传递参数给Storm,返回结果给客户端。

DPRCSpout: 用于连接DRPC Server和Topology,传递参数给Topology。

Topology:实现实际的函数功能。

ReturnResults:用于连接DRPC Server和Topology,用于返回参数给DRPC Server。

流程描述如下:

客户端发送函数的参数给DRPC Server

DRPC Server生成发送函数调用的相关信息给DRPC Spout,包括请求ID,请求参数,返回结果的信息。

DRPC Spout发送["args", "return-info"]给Topology的第一个Bolt,其中args代表请求参数,return-info代表返回需要的信息。

Topology最后一个Bolt发送["result", "return-info"]给ReturnResults,其中result代表返回结果,return-info代表返回需要的信息。

ReturnResults将结果和返回需要的信息传递给DRPC Server

DRPC Server将结果返回给DRPC客户端。

Storm DPRC API介绍

我们先看一下DRPC客户端的API:

假设DRPC服务器的地址为172.16.32.105,函数名为“exclamation”,输入为“hello world”

DRPCClient的java用法

package opzoon;

import java.util.Map;

import org.apache.storm.utils.Utils;

import org.apache.storm.utils.DRPCClient;

public class MyDrpcClient{

    public static void main(String[] args) throws Exception{

        Map config = Utils.readDefaultConfig();

        DRPCClient client = new DRPCClient(config,"172.16.32.105", 3772);

        try{

            String result = client.execute("exclamation","hello world");

        } catch (Exception e) {

            System.out.println(e.getMessage());

        }

    }

}

这里说明一下,storm1.0.0之前的的DRPCClient构造函数是两个参数的,

DRPCClient client=newDRPCClient("172.16.32.105", 3772);

storm1.0.0之后的的DRPCClient构造函数是三个参数的,

Map config = Utils.readDefaultConfig();

DRPCClient client = new DRPCClient(config, "172.16.32.105", 3772);

DRPCClient的curl用法

然后再看一下DRPC服务器端的API:

package org.apache.storm.starter;

import org.apache.storm.Config;

import org.apache.storm.StormSubmitter;

import org.apache.storm.drpc.DRPCSpout;

import org.apache.storm.drpc.ReturnResults;

import org.apache.storm.topology.BasicOutputCollector;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.TopologyBuilder;

import org.apache.storm.topology.base.BaseBasicBolt;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

public class ManualDRPC {

    public static class ExclamationBolt extends BaseBasicBolt {

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

             declarer.declare(new Fields("result", "return-info"));

        }

        @Override

        public void execute(Tuple tuple, BasicOutputCollector collector) {

            String arg = tuple.getString(0);

            Object retInfo = tuple.getValue(1);

            collector.emit(new Values(arg + "!!!", retInfo));

        }

    }

    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        DRPCSpout spout = new DRPCSpout("exclamation");

        builder.setSpout("drpc", spout);

        builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");

        builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");

        Config conf = new Config();

        StormSubmitter.submitTopologyWithProgressBar("exclaim", conf, builder.createTopology());

    }

}

Top