我尝试在Ubuntu 14.04中设置我的storm DRPC开发环境。我已经设置了Zookeeper,Nimbus,drcp服务器,supervisor,ui,并使它们运行并提交拓扑。

然后我有以下名为NodeClient.js的node.js客户端代码,当我尝试运行nodejs NodeClient.js时,什么也没有发生。我的Nodejs客户端出了什么问题,或者其他地方出了问题。

。我认为client.execute会将"hello world“数据包发送到storm队列。我说的对吗?
========================NodeClient.js=================
var DRPC = require('storm-drpc-node');
var client = DRPC({
host: '127.0.0.1',
port: 3772,
timeout: 1000,
keepAlive: true,
maxConnectCounts: 30
});
client.on('error', function(err) {
throw err;
});
// callback way
client.execute('spout-name', 'hello world', function(err, res) {
if(err) throw err;
else console.log(res);});
======Edited======

我启动了supervisor,现在它运行如上图所示,为了上面的NodeClient.js工作,我需要下载npm安装storm-node,并创建spout。NodeClient应该修改如下:?
===============Do我需要创建spout==============
var storm = require('node-storm');
var DRPC = require('storm-drpc-node');
var client = DRPC({
host: '127.0.0.1',
port: 3772,
timeout: 1000,
keepAlive: true,
maxConnectCounts: 30
});
var myspout = storm.spout(function(sync) {
// For an unreliable emit:
this.emit([fieldValue1, fieldValue2])
// For a reliable emit:
this.emit([fieldValue1, fieldValue2], {id: 'some unique id'})
// Tell storm we're done emitting tuples for now
sync()
})
.declareOutputFields(["field1", "field2"]) // declare output fields
.on('fail', function(data) {
// Handle tuple failure
console.log('data is not send in myspount');
})
.on('ack', function(data) {
// Handle tuple acknowledgement
console.log('data is in myspout');
});
client.on('error', function(err) {
throw err;
});
// callback way
client.execute('myspout',JSON.stringify('hello world'), function(err, res) {
if(err) throw err;
else console.log(res);
});=============Editor 2============现在看起来服务器部分工作正常。请检查UI仪表板中的拓扑和输出。我使用的是apache-storm 0.9.3的示例BasicDRPCTopology。我使用remoteTopology。在代码中,没有喷嘴。我想知道仪表板上的喷嘴是从哪里来的?如何使用Nodejs作为Spout?


//提交拓扑命令。/storm jar ../examples/storm-starter/storm-starter-topologies-0.9.3.jar storm.starter.BasicDRPCTopology调用站
=======================BasicDRPCTopology============
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
if (args == null || args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
for (String word : new String[]{ "hello", "goodbye" }) {
System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
}
cluster.shutdown();
drpc.shutdown();
}
else {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
}==================edited 3=============
我把下面的Java客户端DRPC,它也不工作。我的客户部分是不是有问题。我在代码后面附加了我的storm.yaml :有什么提示吗?
public class TestSpout {
public static void main(String[] args) throws TException, DRPCExecutionException {
DRPCClient client = new DRPCClient("127.0.0.1", 3772);
String result = client.execute("callstatio", "hello world");
System.out.println("result is:"+result);
}
}=========storm.yaml====
storm.zookeeper.servers:
- "localhost"
nimbus.host: "localhost"
storm.zookeeper.port: 2181
storm.local.dir: "/var/stormtmp/"
java.library.path: "/usr/local/lib/"
supervisor.slots.ports:
-6700
-6701
-6702
-6703
worker.childopts: "-Xmx768m"
nimbus.childopts: "-Xmx512m"
supervisor.childopts: "-Xmx256m"
drpc.servers:
- "127.0.0.1"================Edit 4=正在启动。/storm supervisor给出以下异常:有任何提示吗?java.lang.IllegalArgumentException:不知道如何从: java.lang.Integer创建ISeq
发布于 2015-03-23 20:57:59
您需要使用以下命令启动DRPC风暴
storm drpc 然后在storm.yaml中添加DRPC服务器的url。
drpc.servers:
- "my.ip.com"别忘了使用
LinearDRPCTopologyBuilder而不是在构建拓扑时使用传统的“TopologyBuilder”。
从storm (本地模式等)文档中查看更多信息
https://stackoverflow.com/questions/29206001
复制相似问题