我无法启动我的Kafka流应用程序。当我依赖于融合的卡夫卡云时,我能够做到这一点,但当我在码头上切换到卡夫卡时,它就不再开始了。
码头工人-组成:
# https://docs.confluent.io/current/installation/docker/config-reference.html
# https://github.com/confluentinc/cp-docker-images
version: "3"
services:
zookeeper:
container_name: local-zookeeper
image: confluentinc/cp-zookeeper:5.5.1
ports:
- 2181:2181
hostname: zookeeper
networks:
- local_kafka_network
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
container_name: local-kafka
image: confluentinc/cp-kafka:5.5.1
depends_on:
- zookeeper
ports:
- 9092:9092
- 29092:29092
hostname: kafka
networks:
- local_kafka_network
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
schema-registry:
container_name: local-schema-registry
image: confluentinc/cp-schema-registry:5.5.1
depends_on:
- kafka
ports:
- 8081:8081
hostname: schema-registry
networks:
- local_kafka_network
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
- SCHEMA_REGISTRY_DEBUG=true
command:
- /bin/bash
- -c
- |
# install jq
curl -sL https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64 -o /usr/local/bin/jq && chmod u+x /usr/local/bin/jq
# start
/etc/confluent/docker/run
schema-registry-ui:
container_name: local-schema-registry-ui
image: landoop/schema-registry-ui:latest
depends_on:
- schema-registry
ports:
- 8001:8000
hostname: schema-registry-ui
networks:
- local_kafka_network
environment:
- SCHEMAREGISTRY_URL=http://schema-registry:8081
- PROXY=true
kafka-rest:
container_name: local-kafka-rest
image: confluentinc/cp-kafka-rest:5.5.1
depends_on:
- kafka
- schema-registry
ports:
- 8082:8082
hostname: kafka-rest
networks:
- local_kafka_network
environment:
- KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_REST_LISTENERS=http://kafka-rest:8082
- KAFKA_REST_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- KAFKA_REST_HOST_NAME=kafka-rest
kafka-ui:
container_name: local-kafka-ui
image: landoop/kafka-topics-ui:latest
depends_on:
- kafka-rest
ports:
- 8000:8000
hostname: kafka-ui
networks:
- local_kafka_network
environment:
- KAFKA_REST_PROXY_URL=http://kafka-rest:8082
- PROXY=true
# https://github.com/confluentinc/ksql/blob/4.1.3-post/docs/tutorials/docker-compose.yml#L85
ksql-server:
container_name: local-ksql-server
# TODO update 5.5.1
image: confluentinc/cp-ksql-server:5.4.2
depends_on:
- kafka
- schema-registry
ports:
- 8088:8088
hostname: ksql-server
networks:
- local_kafka_network
environment:
- KSQL_BOOTSTRAP_SERVERS=kafka:29092
- KSQL_LISTENERS=http://ksql-server:8088
- KSQL_KSQL_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- KSQL_KSQL_SERVICE_ID=local-ksql-server
ksql-cli:
container_name: local-ksql-cli
# TODO update 5.5.1
image: confluentinc/cp-ksql-cli:5.4.2
depends_on:
- ksql-server
hostname: ksql-cli
networks:
- local_kafka_network
entrypoint: /bin/sh
tty: true
# distributed mode
kafka-connect:
container_name: local-kafka-connect
image: confluentinc/cp-kafka-connect:5.5.1
depends_on:
- kafka
- schema-registry
ports:
- 8083:8083
hostname: kafka-connect
networks:
- local_kafka_network
environment:
- CONNECT_BOOTSTRAP_SERVERS=kafka:29092
- CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect
- CONNECT_REST_PORT=8083
- CONNECT_GROUP_ID=local-connect-group
- CONNECT_CONFIG_STORAGE_TOPIC=local-connect-configs
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
- CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
- CONNECT_OFFSET_STORAGE_TOPIC=local-connect-offsets
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
- CONNECT_STATUS_STORAGE_TOPIC=local-connect-status
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_PLUGIN_PATH=/usr/share/java
volumes:
- "./local/connect/data:/data"
command:
- /bin/bash
- -c
- |
# install unzip
apt-get update && apt-get install unzip -y
# install plugin
unzip /data/jcustenborder-kafka-connect-spooldir-*.zip 'jcustenborder-kafka-connect-spooldir-*/lib/*' -d /usr/share/java/kafka-connect-spooldir/
mv /usr/share/java/kafka-connect-spooldir/*/lib/* /usr/share/java/kafka-connect-spooldir
ls -la /usr/share/java
# setup spooldir plugin
mkdir -p /tmp/error /tmp/finished
# start
/etc/confluent/docker/run
kafka-connect-ui:
container_name: local-kafka-connect-ui
image: landoop/kafka-connect-ui:latest
depends_on:
- kafka-connect
ports:
- 8002:8000
hostname: kafka-connect-ui
networks:
- local_kafka_network
environment:
- CONNECT_URL=http://kafka-connect:8083
networks:
local_kafka_network:主要方法:
package io.confluent.developer.time.solution;
import io.confluent.developer.StreamsUtils;
import io.confluent.developer.avro.ElectronicOrder;
import io.confluent.developer.time.TopicLoader;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
public class StreamsTimestampExtractor {
static class OrderTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
ElectronicOrder order = (ElectronicOrder)record.value();
System.out.println("Extracting time of " + order.getTime() + " from " + order);
return order.getTime();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "extractor-windowed-streams");
StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = streamsProps.getProperty("extractor.input.topic");
final String outputTopic = streamsProps.getProperty("extractor.output.topic");
final Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
final SpecificAvroSerde<ElectronicOrder> electronicSerde =
StreamsUtils.getSpecificAvroSerde(configMap);
final KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopic,
Consumed.with(Serdes.String(), electronicSerde)
.withTimestampExtractor(new OrderTimestampExtractor()))
.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
electronicStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Materialized.with(Serdes.String(), Serdes.Double()))
.toStream()
.map((wk, value) -> KeyValue.pair(wk.key(),value))
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
kafkaStreams.start();
}
}在我的机器上运行代码会生成记录,但会立即退出:

请注意,当我使用汇合的Kafka云运行这个精确的代码时,我能够处理连续的数据流。
要在本地复制,只需从这个汇合教程获取代码,修改属性文件以指向本地Kafka,并使用我为设置Kafka提供的docker。
发布于 2022-06-10 06:10:49
添加一个关机钩子和未被处理的异常处理程序帮助我诊断并修复了这个问题:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
kafkaStreams.setUncaughtExceptionHandler(e -> {
log.error("unhandled streams exception, shutting down.", e);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Runtime shutdown hook, state={}", kafkaStreams.state());
if (kafkaStreams.state().isRunningOrRebalancing()) {
log.info("Shutting down started.");
kafkaStreams.close(Duration.ofMinutes(2));
log.info("Shutting down completed.");
}
}));
kafkaStreams.start();结果,我在代理中配置了一个复制因子1,而在我的属性文件中配置了3,所以例外情况是:由:org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.引起,所以我的解决方案是将属性文件中的replication.factor从3降到1。
https://stackoverflow.com/questions/72556566
复制相似问题