首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Storm Toplology使用Kafka队列中的数据

使用Storm Toplology使用Kafka队列中的数据
EN

Stack Overflow用户
提问于 2015-12-17 11:14:41
回答 1查看 402关注 0票数 1

该算法使用KafkaSpout从卡夫卡队列中读取数据。

我正面临以下例外情况:

代码语言:javascript
复制
Exception in thread "main" java.lang.IllegalStateException: Couldn't initialize the topology
  at com.bridgera.iot.kafka.App.main(App.java:63)
Caused by: java.lang.IllegalArgumentException: Storm conf is not valid. Must be json-serializable
  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:104)
  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:70)
  at com.bridgera.iot.kafka.App.main(App.java:60)

我的Java代码:

代码语言:javascript
复制
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
{
    String nimbusHost = "localhost";
    ZkHosts zkHosts=new ZkHosts("localhost:2181");
    String topic_name="test";
    String consumer_group_id="storm";
    String zookeeper_root="";
    SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, 
            topic_name, zookeeper_root, consumer_group_id);
    kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
    TopologyBuilder builder=new TopologyBuilder();
    //builder.setSpout("KafkaSpout", kafkaSpout, 1);
    builder.setSpout("KafkaSpout", kafkaSpout);
    builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout");
    Map<String, Object> conf = new HashMap<String, Object>();
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
    conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
    conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000);
    conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
    conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30);
    LocalCluster cluster=new LocalCluster();
    try{
        cluster.submitTopology("KafkaConsumerTopology", conf, builder.createTopology());
        Thread.sleep(120000);
    }catch (Exception e) {
        throw new IllegalStateException("Couldn't initialize the topology", e);
    }
}

让我知道我在配置中做错了什么。仅供参考:我在AWS集群中运行Zookeeper和Storm JVM (本地执行)。

EN

回答 1

Stack Overflow用户

发布于 2015-12-17 11:42:34

试试这个:

代码语言:javascript
复制
List<String> hosts = Arrays.asList("localhost")
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, hosts);

我认为这是从Arrays.asList("localhost")返回的java.util.AbstractList<Object>和对json的解析的问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34326020

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档