目标:我想从一个主题分析标题,我正在寻找一些直接的方法来看到标题。因此,我不想仅为此而开发额外的应用程序或扩展代码。任何直接查看标题的工具都会很有用。
我从this question 上读到,kafkacat可以这样做
kafkacat -b kafka-broker:9092 -t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'因此,我希望启动kafkacat作为我的docker-compose的一部分,我的第一次尝试是
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka
container_name: kafka
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
kafkacat:
image: confluentinc/cp-kafkacat
command:
- bash
- -c
links:
- broker我得到了
broker | [2020-12-11 10:46:13,084] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
broker | [2020-12-11 10:46:13,084] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
confluentinc_kafkacat_1 exited with code 2
broker | [2020-12-11 10:51:13,085] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)基于kafkacat tutorial的第二次尝试
C:\Users\DEMETRC>docker run --tty confluentinc/cp-kafkacat kafkacat -b localhost:9092 -L
% ERROR: Failed to acquire metadata: Local: Broker transport failure注:我的docker kafka在没有docker网络的localhost:9092中可用。在教程中的示例中,其kafka在Docker网络docker-compose_default上的kafka:29092中可用
你知道怎么把kafkacat添加到我的docker-compose中吗?还有没有其他建议来调查邮件头?
* Robin Moffatt回答后的第一次编辑
我编辑了docker-compose添加了Robin的建议
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka
container_name: kafka
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
kafkacat:
image: edenhill/kafkacat:1.6.0
container_name: kafkacat
links:
- broker
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done下面是我的主题描述
# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demotopic
Created topic demotopic.
# kafka-topics --describe --bootstrap-server localhost:9092 --topic demotopic
Topic: demotopic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: demotopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
#我尝试使用kafkacat时遇到的错误
/ # kafkacat -b localhost:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p
Offset: %o Headers: %h\n'
%3|1607709622.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1607709623.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
% ERROR: Failed to query metadata for topic demotopic: Local: Broker transport failure
/ # *第二次编辑
现在,我可以使用kafkacat列出所有主题,但仍然无法获取消息标头
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
1 brokers:
broker 1 at localhost:9092 (controller)
4 topics:
topic "demotopic" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-license" with 1 partitions:
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
1 brokers:
broker 1 at localhost:9092 (controller)
4 topics:
topic "demotopic" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-license" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_confluent-metrics" with 12 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
partition 2, leader 1, replicas: 1, isrs: 1
partition 3, leader 1, replicas: 1, isrs: 1
partition 4, leader 1, replicas: 1, isrs: 1
partition 5, leader 1, replicas: 1, isrs: 1
partition 6, leader 1, replicas: 1, isrs: 1
partition 7, leader 1, replicas: 1, isrs: 1
partition 8, leader 1, replicas: 1, isrs: 1
partition 9, leader 1, replicas: 1, isrs: 1
partition 10, leader 1, replicas: 1, isrs: 1
partition 11, leader 1, replicas: 1, isrs: 1
topic "__confluent.support.metrics" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
/ # kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Of
fset: %o Headers: %h\n'
%3|1607719862.133|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
% ERROR: Local: Broker transport failure: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)这个kafkacat脚本有什么问题吗?
kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'发布于 2020-12-11 21:46:48
confluentinc_kafkacat_1 exited with code 2是因为容器退出了,因为您覆盖了启动命令,只启动了bash;它确实这样做了,然后退出
command:
- bash
- -c 此file显示了所需内容的工作示例,在该示例中容器将保持运行
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done对于Aydin的观点,你在Docker中不需要kafkacat -你可以在本地做,但我发现在Docker Compose中包含kafkacat更容易,这样你就不需要依赖于本地安装。
如果你想在Docker Compose之外运行kafkacat,但仍然在Docker和docker run中运行kafkacat,你可以做到,但要记住networking implications。如果你尝试使用localhost,那么这是相对于容器本身的,也就是运行kafkacat的地方。相反,您需要使Kafka代理可供kafkacat容器访问,例如,通过添加到主机网络:
docker run --network host --interactive --rm edenhill/kafkacat:1.6.0 -b localhost:9092 -Lhttps://stackoverflow.com/questions/65250293
复制相似问题