首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Storm-Kafka spout未在zookeeper集群中创建节点。

Storm-Kafka spout未在zookeeper集群中创建节点。
EN

Stack Overflow用户
提问于 2016-03-03 20:27:44
回答 1查看 1.2K关注 0票数 2

我用storm-kafka来标记storm 0.10和kafka 0.9.0.0。每当我在集群上运行我的拓扑时,它都会从头开始读取,尽管我将属性文件中的zkRoot和消费者groupId设置为-

代码语言:javascript
复制
kafka.zkHosts=myserver.myhost.com:2181
kafka.topic=onboarding-mail-topic
kafka.zkRoot=/kafka-storm
kafka.group.id=onboarding

喷嘴:

代码语言:javascript
复制
BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
                    String topicName = prop.getProperty("kafka.topic");
                    String zkRoot = prop.getProperty("kafka.zkRoot");
                    String groupId = prop.getProperty("kafka.group.id");

                    //kafka spout conf
                    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);

                    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

                    KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);

当我检查zookeeper ls /时,它没有显示kafka-storm

代码语言:javascript
复制
[controller_epoch, controller, brokers, storm, zookeeper, kafka-manager, admin, isr_change_notification, consumers, config]
EN

回答 1

Stack Overflow用户

发布于 2016-03-04 01:17:46

最后,我想通了。因为从kafka读取和写回kafka的偏移量是以不同的方式控制的。

如果您在storm集群上运行拓扑,而不管是单节点还是多节点,请确保在storm.yaml文件中设置了以下内容

代码语言:javascript
复制
storm.zookeeper.servers

代码语言:javascript
复制
storm.zookeeper.port

除zkHosts和zkRoot之外的属性以及使用者组id。

或者,最佳实践是通过在创建KafkaSpout时设置正确的值来覆盖拓扑中的这些属性,例如-

代码语言:javascript
复制
        BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
        String topicName = prop.getProperty("kafka.topic");
        String zkRoot = prop.getProperty("kafka.zkRoot");
        String groupId = prop.getProperty("kafka.group.id");
        String kafkaServers = prop.getProperty("kafka.zkServers");
        String zkPort = prop.getProperty("kafka.zkPort");
        //kafka spout conf
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        kafkaConfig.zkServers = Arrays.asList(kafkaServers);
        kafkaConfig.zkPort = Integer.valueOf(zkPort);

        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);

或者甚至可以将这些值放入Config对象中。这样更好,因为您可能希望将偏移信息存储到其他zookeeper集群,而您的拓扑读取消息来自完全不同的代理

用于理解的KafkaSpout代码片段

代码语言:javascript
复制
 @Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
    _collector = collector;

    Map stateConf = new HashMap(conf);
    List<String> zkServers = _spoutConfig.zkServers;
    if (zkServers == null) {
        zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    }
    Integer zkPort = _spoutConfig.zkPort;
    if (zkPort == null) {
        zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    }
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
    _state = new ZkState(stateConf);

    _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));

    // using TransactionalState like this is a hack
    int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
    if (_spoutConfig.hosts instanceof StaticHosts) {
        _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
    } else {
        _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
    }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35772199

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档