首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡流不会启动

卡夫卡流不会启动
EN

Stack Overflow用户
提问于 2022-06-09 07:51:08
回答 1查看 173关注 0票数 0

我无法启动我的Kafka流应用程序。当我依赖于融合的卡夫卡云时,我能够做到这一点,但当我在码头上切换到卡夫卡时,它就不再开始了。

码头工人-组成:

代码语言:javascript
复制
# https://docs.confluent.io/current/installation/docker/config-reference.html
# https://github.com/confluentinc/cp-docker-images

version: "3"

services:

  zookeeper:
    container_name: local-zookeeper
    image: confluentinc/cp-zookeeper:5.5.1
    ports:
      - 2181:2181
    hostname: zookeeper
    networks:
      - local_kafka_network
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181

  kafka:
    container_name: local-kafka
    image: confluentinc/cp-kafka:5.5.1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 29092:29092
    hostname: kafka
    networks:
      - local_kafka_network
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

  schema-registry:
    container_name: local-schema-registry
    image: confluentinc/cp-schema-registry:5.5.1
    depends_on:
      - kafka
    ports:
      - 8081:8081
    hostname: schema-registry
    networks:
      - local_kafka_network
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
      - SCHEMA_REGISTRY_DEBUG=true
    command:
      - /bin/bash
      - -c
      - |
        # install jq
        curl -sL https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64 -o /usr/local/bin/jq && chmod u+x /usr/local/bin/jq
        # start
        /etc/confluent/docker/run

  schema-registry-ui:
    container_name: local-schema-registry-ui
    image: landoop/schema-registry-ui:latest
    depends_on:
      - schema-registry
    ports:
      - 8001:8000
    hostname: schema-registry-ui
    networks:
      - local_kafka_network
    environment:
      - SCHEMAREGISTRY_URL=http://schema-registry:8081
      - PROXY=true

  kafka-rest:
    container_name: local-kafka-rest
    image: confluentinc/cp-kafka-rest:5.5.1
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8082:8082
    hostname: kafka-rest
    networks:
      - local_kafka_network
    environment:
      - KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_REST_LISTENERS=http://kafka-rest:8082
      - KAFKA_REST_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - KAFKA_REST_HOST_NAME=kafka-rest

  kafka-ui:
    container_name: local-kafka-ui
    image: landoop/kafka-topics-ui:latest
    depends_on:
      - kafka-rest
    ports:
      - 8000:8000
    hostname: kafka-ui
    networks:
      - local_kafka_network
    environment:
      - KAFKA_REST_PROXY_URL=http://kafka-rest:8082
      - PROXY=true

  # https://github.com/confluentinc/ksql/blob/4.1.3-post/docs/tutorials/docker-compose.yml#L85
  ksql-server:
    container_name: local-ksql-server
    # TODO update 5.5.1
    image: confluentinc/cp-ksql-server:5.4.2
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8088:8088
    hostname: ksql-server
    networks:
      - local_kafka_network
    environment:
      - KSQL_BOOTSTRAP_SERVERS=kafka:29092
      - KSQL_LISTENERS=http://ksql-server:8088
      - KSQL_KSQL_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - KSQL_KSQL_SERVICE_ID=local-ksql-server

  ksql-cli:
    container_name: local-ksql-cli
    # TODO update 5.5.1
    image: confluentinc/cp-ksql-cli:5.4.2
    depends_on:
      - ksql-server
    hostname: ksql-cli
    networks:
      - local_kafka_network
    entrypoint: /bin/sh
    tty: true

  # distributed mode
  kafka-connect:
    container_name: local-kafka-connect
    image: confluentinc/cp-kafka-connect:5.5.1
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8083:8083
    hostname: kafka-connect
    networks:
      - local_kafka_network
    environment:
      - CONNECT_BOOTSTRAP_SERVERS=kafka:29092
      - CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect
      - CONNECT_REST_PORT=8083
      - CONNECT_GROUP_ID=local-connect-group
      - CONNECT_CONFIG_STORAGE_TOPIC=local-connect-configs
      - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
      - CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
      - CONNECT_OFFSET_STORAGE_TOPIC=local-connect-offsets
      - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
      - CONNECT_STATUS_STORAGE_TOPIC=local-connect-status
      - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
      - CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_PLUGIN_PATH=/usr/share/java
    volumes:
      - "./local/connect/data:/data"
    command:
      - /bin/bash
      - -c
      - |
        # install unzip
        apt-get update && apt-get install unzip -y
        # install plugin
        unzip /data/jcustenborder-kafka-connect-spooldir-*.zip 'jcustenborder-kafka-connect-spooldir-*/lib/*' -d /usr/share/java/kafka-connect-spooldir/
        mv /usr/share/java/kafka-connect-spooldir/*/lib/* /usr/share/java/kafka-connect-spooldir
        ls -la /usr/share/java
        # setup spooldir plugin
        mkdir -p /tmp/error /tmp/finished
        # start
        /etc/confluent/docker/run

  kafka-connect-ui:
    container_name: local-kafka-connect-ui
    image: landoop/kafka-connect-ui:latest
    depends_on:
      - kafka-connect
    ports:
      - 8002:8000
    hostname: kafka-connect-ui
    networks:
      - local_kafka_network
    environment:
      - CONNECT_URL=http://kafka-connect:8083

networks:
  local_kafka_network:

主要方法:

代码语言:javascript
复制
package io.confluent.developer.time.solution;

import io.confluent.developer.StreamsUtils;
import io.confluent.developer.avro.ElectronicOrder;
import io.confluent.developer.time.TopicLoader;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.TimestampExtractor;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;

public class StreamsTimestampExtractor {

    static class OrderTimestampExtractor implements TimestampExtractor {
        @Override
        public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
            ElectronicOrder order = (ElectronicOrder)record.value();
            System.out.println("Extracting time of " + order.getTime() + " from " + order);
            return order.getTime();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {

        final Properties streamsProps = StreamsUtils.loadProperties();
        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "extractor-windowed-streams");

        StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = streamsProps.getProperty("extractor.input.topic");
        final String outputTopic = streamsProps.getProperty("extractor.output.topic");
        final Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);

        final SpecificAvroSerde<ElectronicOrder> electronicSerde =
                StreamsUtils.getSpecificAvroSerde(configMap);

        final KStream<String, ElectronicOrder> electronicStream =
                builder.stream(inputTopic,
                Consumed.with(Serdes.String(), electronicSerde)
                        .withTimestampExtractor(new OrderTimestampExtractor()))
                        .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));

        electronicStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
                .aggregate(() -> 0.0,
                        (key, order, total) -> total + order.getPrice(),
                        Materialized.with(Serdes.String(), Serdes.Double()))
                .toStream()
                .map((wk, value) -> KeyValue.pair(wk.key(),value))
                .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
        TopicLoader.runProducer();
        kafkaStreams.start();

    }
}

在我的机器上运行代码会生成记录,但会立即退出:

请注意,当我使用汇合的Kafka云运行这个精确的代码时,我能够处理连续的数据流。

要在本地复制,只需从这个汇合教程获取代码,修改属性文件以指向本地Kafka,并使用我为设置Kafka提供的docker。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-10 06:10:49

添加一个关机钩子和未被处理的异常处理程序帮助我诊断并修复了这个问题:

代码语言:javascript
复制
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
        TopicLoader.runProducer();
        kafkaStreams.setUncaughtExceptionHandler(e -> {
            log.error("unhandled streams exception, shutting down.", e);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
        });

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Runtime shutdown hook, state={}", kafkaStreams.state());
            if (kafkaStreams.state().isRunningOrRebalancing()) {
                log.info("Shutting down started.");
                kafkaStreams.close(Duration.ofMinutes(2));
                log.info("Shutting down completed.");
            }
        }));
        kafkaStreams.start();

结果,我在代理中配置了一个复制因子1,而在我的属性文件中配置了3,所以例外情况是:由:org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.引起,所以我的解决方案是将属性文件中的replication.factor从3降到1。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72556566

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档