首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Node js storm客户端发送数据

Node js storm客户端发送数据
EN

Stack Overflow用户
提问于 2015-03-23 16:36:18
回答 1查看 1.3K关注 0票数 0

我尝试在Ubuntu 14.04中设置我的storm DRPC开发环境。我已经设置了Zookeeper,Nimbus,drcp服务器,supervisor,ui,并使它们运行并提交拓扑。

然后我有以下名为NodeClient.js的node.js客户端代码,当我尝试运行nodejs NodeClient.js时,什么也没有发生。我的Nodejs客户端出了什么问题,或者其他地方出了问题。

。我认为client.execute会将"hello world“数据包发送到storm队列。我说的对吗?

========================NodeClient.js=================

代码语言:javascript
复制
var DRPC = require('storm-drpc-node');


var client = DRPC({
    host: '127.0.0.1',
    port: 3772,
    timeout: 1000,
    keepAlive: true,
    maxConnectCounts: 30
});



client.on('error', function(err) {
    throw err;
});



// callback way 
client.execute('spout-name', 'hello world', function(err, res) {
    if(err) throw err;
    else console.log(res);

});

======Edited======

我启动了supervisor,现在它运行如上图所示,为了上面的NodeClient.js工作,我需要下载npm安装storm-node,并创建spout。NodeClient应该修改如下:?

===============Do我需要创建spout==============

代码语言:javascript
复制
 var storm = require('node-storm');
 var DRPC = require('storm-drpc-node');
var client = DRPC({
    host: '127.0.0.1',
    port: 3772,
    timeout: 1000,
    keepAlive: true,
    maxConnectCounts: 30
});

var myspout = storm.spout(function(sync) {
    // For an unreliable emit:
    this.emit([fieldValue1, fieldValue2])

    // For a reliable emit:
    this.emit([fieldValue1, fieldValue2], {id: 'some unique id'})

    // Tell storm we're done emitting tuples for now
    sync()
})
.declareOutputFields(["field1", "field2"]) // declare output fields
.on('fail', function(data) {
    // Handle tuple failure
   console.log('data is not send in myspount');
})
.on('ack', function(data) {
    // Handle tuple acknowledgement
    console.log('data is in myspout');
});

client.on('error', function(err) {
    throw err;
});

// callback way
client.execute('myspout',JSON.stringify('hello world'), function(err, res) {
    if(err) throw err;
    else console.log(res);
});

=============Editor 2============现在看起来服务器部分工作正常。请检查UI仪表板中的拓扑和输出。我使用的是apache-storm 0.9.3的示例BasicDRPCTopology。我使用remoteTopology。在代码中,没有喷嘴。我想知道仪表板上的喷嘴是从哪里来的?如何使用Nodejs作为Spout?

//提交拓扑命令。/storm jar ../examples/storm-starter/storm-starter-topologies-0.9.3.jar storm.starter.BasicDRPCTopology调用站

=======================BasicDRPCTopology============

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);

    Config conf = new Config();

    if (args == null || args.length == 0) {
      LocalDRPC drpc = new LocalDRPC();
      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));




for (String word : new String[]{ "hello", "goodbye" }) {
    System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
  }

  cluster.shutdown();
  drpc.shutdown();
}
else {
  conf.setNumWorkers(3);
  StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
}

==================edited 3=============

我把下面的Java客户端DRPC,它也不工作。我的客户部分是不是有问题。我在代码后面附加了我的storm.yaml :有什么提示吗?

public class TestSpout {

代码语言:javascript
复制
public static void main(String[] args) throws TException, DRPCExecutionException {
        DRPCClient client = new DRPCClient("127.0.0.1", 3772);
        String result = client.execute("callstatio", "hello world");
        System.out.println("result is:"+result);    
    }

}

=========storm.yaml====

代码语言:javascript
复制
storm.zookeeper.servers:
     - "localhost"

nimbus.host: "localhost"
storm.zookeeper.port: 2181
storm.local.dir: "/var/stormtmp/"
java.library.path: "/usr/local/lib/"
supervisor.slots.ports:
  -6700
  -6701
  -6702
  -6703

worker.childopts: "-Xmx768m"
nimbus.childopts: "-Xmx512m"
supervisor.childopts: "-Xmx256m"


 drpc.servers:
     - "127.0.0.1"

================Edit 4=正在启动。/storm supervisor给出以下异常:有任何提示吗?java.lang.IllegalArgumentException:不知道如何从: java.lang.Integer创建ISeq

EN

回答 1

Stack Overflow用户

发布于 2015-03-23 20:57:59

您需要使用以下命令启动DRPC风暴

代码语言:javascript
复制
storm drpc 

然后在storm.yaml中添加DRPC服务器的url。

代码语言:javascript
复制
drpc.servers:
   - "my.ip.com"

别忘了使用

代码语言:javascript
复制
LinearDRPCTopologyBuilder

而不是在构建拓扑时使用传统的“TopologyBuilder”。

从storm (本地模式等)文档中查看更多信息

https://storm.apache.org/documentation/Distributed-RPC.html

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29206001

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档