首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从另一个容器中的flink作业写入码头容器中的kafka主题

从另一个容器中的flink作业写入码头容器中的kafka主题
EN

Stack Overflow用户
提问于 2021-09-10 01:46:32
回答 1查看 551关注 0票数 1

我正在努力学习Flink,Docker和Kafka,所以我正在尝试设置一个简单的虚拟设置,让Flink和Kafka从不同的容器中进行通信。我正在本地完成所有这些工作,但我也需要了解如何通过从不同位置运行的容器来实现这一点,所以我不想仅仅为了让它正常工作而采取任何捷径。

我有一个flink作业,目前只是scala中欺诈检测示例的精简版本。我在一个虚拟机上运行了Kafka和Flink,所以这里唯一相关的部分可能是引导服务器。以下是工作的全部主要功能的内容:

代码语言:javascript
复制
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val transactions: DataStream[Transaction] = env
  .addSource(new TransactionSource)
  .name("transactions")

val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")

val myProducer = new FlinkKafkaProducer[Transaction](
  "flagged-transactions",                  // target topic
  TransactionSchema("flagged-transactions"),
  properties,                  // producer config
  FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance

transactions.addSink(myProducer)
env.execute("transactions")

我用docker network create flink-network启动了一个码头网络。然后用docker run -d --rm --name=jobmanager --network flink-network --publish 8081:8081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest jobmanager启动flink,用docker run -d --rm --name=taskmanager --network flink-network --publish 9092:9092 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest taskmanager启动任务管理器

目前,我启动kafka的对接者-Compose.yml看起来是这样的:

代码语言:javascript
复制
version: '2'
networks:
  default:
    external: true
    name: flink-network
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    command: sh -c "((sleep 15 && kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic flagged-transactions)&) && /etc/confluent/docker/run "

    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

我用docker-compose up -d启动卡夫卡容器,然后进入flink的web界面并提交作业。它运行,但我得到了例外的Failed to send data to Kafka: Topic flagged-transactions not present in metadata after 60000 ms.,如果我访问卡夫卡与docker exec,我可以看到,我的flagged-transactions主题在那里,所以我认为这一定是一个网络问题。

最初的docker-compose.yml来自这里,我在故障排除过程中添加了一些内容。我不太清楚它的很多网络方面,所以很多都是我的猜测,并复制/粘贴各种问题的解决方案。我尝试过yml的environment部分的几个变体(例如,将localhost改为kafka,反之亦然),但它们都不起作用。

有什么建议吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-10 10:32:47

乍一看,有几件事:

公开端口并公布侦听器

您正在暴露端口29092在您的对接-撰写,这是配置为“卡夫卡”主机名。如果你愿意的话,你很可能无法通过你的主机与卡夫卡进行交流。您需要公开9092:9092 (即localhost),这样来自客户端(localhost)的目标名称(Localhost)与端口上的主机名(如果有意义的话)匹配。您所链接的撰写文件公开了29092,但正确地在"localhost“上使用了29092,而在"kafka”上正确地使用了9092来使其工作,您已经将其设置为相反的方式。

容器-通过容器名称进行容器通信

您的作业函数运行在一个自带的flink容器上,试图连接到引导服务器"localhost:9092",因为它试图访问自己,因此无法工作。你需要把它改成“卡夫卡:29092”,这样它才能与卡夫卡容器沟通。如果您正在主机上运行flink作业,则localhost:9092将工作。

其他东西

做出这些改变,看看你是否幸运。您应该能够通过localhost:9092 (使用Kafka客户端、portqry、netcat等)从主机上看到代理服务器。并从您的flink容器通过卡夫卡:29092(可能使用netcat等)。

如果桥接器存在网络问题,请尝试将flink容器添加到同一个坞-组合文件中,并从YAML文件中删除“网络”条目,默认情况下在它们自己的网络上启动组。如果容器在自己的网络上并正确配置,则容器可以通过容器名称进行通信。

卡夫卡听众基本上需要匹配客户端连接的内容。因此,如果您从主机连接到"localhost: 9092 ",则9092的侦听器必须是"localhost“,以便预期的名称匹配,否则连接将失败。如果我们通过主机名"kafka“从另一个容器连接到端口29092,则需要将侦听器配置为"kafka:29092",以此类推。

如果所有这些都不起作用,那么发布一些Kafka服务器容器的日志,我们就从那里开始。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69126331

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档