首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何手动分析kafka消息主题中的headers?使用代码2退出confluentinc_kafkacat_1

如何手动分析kafka消息主题中的headers?使用代码2退出confluentinc_kafkacat_1
EN

Stack Overflow用户
提问于 2020-12-11 19:14:39
回答 1查看 225关注 0票数 0

目标:我想从一个主题分析标题,我正在寻找一些直接的方法来看到标题。因此,我不想仅为此而开发额外的应用程序或扩展代码。任何直接查看标题的工具都会很有用。

我从this question 上读到,kafkacat可以这样做

代码语言:javascript
复制
kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

因此,我希望启动kafkacat作为我的docker-compose的一部分,我的第一次尝试是

代码语言:javascript
复制
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-tools:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka
    container_name: kafka
    command: ["tail", "-f", "/dev/null"]
    network_mode: "host"

  kafkacat:
    image: confluentinc/cp-kafkacat
    command: 
      - bash 
      - -c 
    links:
      - broker

我得到了

代码语言:javascript
复制
broker         | [2020-12-11 10:46:13,084] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
broker         | [2020-12-11 10:46:13,084] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
confluentinc_kafkacat_1 exited with code 2
broker         | [2020-12-11 10:51:13,085] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)

基于kafkacat tutorial的第二次尝试

代码语言:javascript
复制
C:\Users\DEMETRC>docker run --tty confluentinc/cp-kafkacat kafkacat -b localhost:9092 -L
% ERROR: Failed to acquire metadata: Local: Broker transport failure

注:我的docker kafka在没有docker网络的localhost:9092中可用。在教程中的示例中,其kafka在Docker网络docker-compose_default上的kafka:29092中可用

你知道怎么把kafkacat添加到我的docker-compose中吗?还有没有其他建议来调查邮件头?

* Robin Moffatt回答后的第一次编辑

我编辑了docker-compose添加了Robin的建议

代码语言:javascript
复制
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-server:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-tools:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka
    container_name: kafka
    command: ["tail", "-f", "/dev/null"]
    network_mode: "host"

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    links:
      - broker
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

下面是我的主题描述

代码语言:javascript
复制
# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demotopic
Created topic demotopic.
# kafka-topics --describe --bootstrap-server localhost:9092 --topic demotopic
Topic: demotopic        PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: demotopic        Partition: 0    Leader: 1       Replicas: 1     Isr: 1
#

我尝试使用kafkacat时遇到的错误

代码语言:javascript
复制
/ # kafkacat -b localhost:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p
 Offset: %o Headers: %h\n'
%3|1607709622.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1607709623.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
% ERROR: Failed to query metadata for topic demotopic: Local: Broker transport failure
/ #  

*第二次编辑

现在,我可以使用kafkacat列出所有主题,但仍然无法获取消息标头

代码语言:javascript
复制
/ #  kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
 1 brokers:
  broker 1 at localhost:9092 (controller)
 4 topics:
  topic "demotopic" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-license" with 1 partitions:
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
 1 brokers:
  broker 1 at localhost:9092 (controller)
 4 topics:
  topic "demotopic" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-license" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-metrics" with 12 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 2, leader 1, replicas: 1, isrs: 1
    partition 3, leader 1, replicas: 1, isrs: 1
    partition 4, leader 1, replicas: 1, isrs: 1
    partition 5, leader 1, replicas: 1, isrs: 1
    partition 6, leader 1, replicas: 1, isrs: 1
    partition 7, leader 1, replicas: 1, isrs: 1
    partition 8, leader 1, replicas: 1, isrs: 1
    partition 9, leader 1, replicas: 1, isrs: 1
    partition 10, leader 1, replicas: 1, isrs: 1
    partition 11, leader 1, replicas: 1, isrs: 1
  topic "__confluent.support.metrics" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
/ # kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Of
fset: %o Headers: %h\n'
%3|1607719862.133|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
% ERROR: Local: Broker transport failure: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)

这个kafkacat脚本有什么问题吗?

代码语言:javascript
复制
kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
EN

回答 1

Stack Overflow用户

发布于 2020-12-11 21:46:48

confluentinc_kafkacat_1 exited with code 2是因为容器退出了,因为您覆盖了启动命令,只启动了bash;它确实这样做了,然后退出

代码语言:javascript
复制
   command: 
      - bash 
      - -c 

file显示了所需内容的工作示例,在该示例中容器将保持运行

代码语言:javascript
复制
 entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

对于Aydin的观点,你在Docker中不需要kafkacat -你可以在本地做,但我发现在Docker Compose中包含kafkacat更容易,这样你就不需要依赖于本地安装。

如果你想在Docker Compose之外运行kafkacat,但仍然在Docker和docker run中运行kafkacat,你可以做到,但要记住networking implications。如果你尝试使用localhost,那么这是相对于容器本身的,也就是运行kafkacat的地方。相反,您需要使Kafka代理可供kafkacat容器访问,例如,通过添加到主机网络:

代码语言:javascript
复制
docker run --network host --interactive --rm edenhill/kafkacat:1.6.0 -b localhost:9092 -L
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65250293

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档