我正在努力学习Flink,Docker和Kafka,所以我正在尝试设置一个简单的虚拟设置,让Flink和Kafka从不同的容器中进行通信。我正在本地完成所有这些工作,但我也需要了解如何通过从不同位置运行的容器来实现这一点,所以我不想仅仅为了让它正常工作而采取任何捷径。
我有一个flink作业,目前只是scala中欺诈检测示例的精简版本。我在一个虚拟机上运行了Kafka和Flink,所以这里唯一相关的部分可能是引导服务器。以下是工作的全部主要功能的内容:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val transactions: DataStream[Transaction] = env
.addSource(new TransactionSource)
.name("transactions")
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
val myProducer = new FlinkKafkaProducer[Transaction](
"flagged-transactions", // target topic
TransactionSchema("flagged-transactions"),
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
transactions.addSink(myProducer)
env.execute("transactions")我用docker network create flink-network启动了一个码头网络。然后用docker run -d --rm --name=jobmanager --network flink-network --publish 8081:8081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest jobmanager启动flink,用docker run -d --rm --name=taskmanager --network flink-network --publish 9092:9092 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest taskmanager启动任务管理器
目前,我启动kafka的对接者-Compose.yml看起来是这样的:
version: '2'
networks:
default:
external: true
name: flink-network
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
command: sh -c "((sleep 15 && kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic flagged-transactions)&) && /etc/confluent/docker/run "
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1我用docker-compose up -d启动卡夫卡容器,然后进入flink的web界面并提交作业。它运行,但我得到了例外的Failed to send data to Kafka: Topic flagged-transactions not present in metadata after 60000 ms.,如果我访问卡夫卡与docker exec,我可以看到,我的flagged-transactions主题在那里,所以我认为这一定是一个网络问题。
最初的docker-compose.yml来自这里,我在故障排除过程中添加了一些内容。我不太清楚它的很多网络方面,所以很多都是我的猜测,并复制/粘贴各种问题的解决方案。我尝试过yml的environment部分的几个变体(例如,将localhost改为kafka,反之亦然),但它们都不起作用。
有什么建议吗?
发布于 2021-09-10 10:32:47
乍一看,有几件事:
公开端口并公布侦听器
您正在暴露端口29092在您的对接-撰写,这是配置为“卡夫卡”主机名。如果你愿意的话,你很可能无法通过你的主机与卡夫卡进行交流。您需要公开9092:9092 (即localhost),这样来自客户端(localhost)的目标名称(Localhost)与端口上的主机名(如果有意义的话)匹配。您所链接的撰写文件公开了29092,但正确地在"localhost“上使用了29092,而在"kafka”上正确地使用了9092来使其工作,您已经将其设置为相反的方式。
容器-通过容器名称进行容器通信
您的作业函数运行在一个自带的flink容器上,试图连接到引导服务器"localhost:9092",因为它试图访问自己,因此无法工作。你需要把它改成“卡夫卡:29092”,这样它才能与卡夫卡容器沟通。如果您正在主机上运行flink作业,则localhost:9092将工作。
其他东西
做出这些改变,看看你是否幸运。您应该能够通过localhost:9092 (使用Kafka客户端、portqry、netcat等)从主机上看到代理服务器。并从您的flink容器通过卡夫卡:29092(可能使用netcat等)。
如果桥接器存在网络问题,请尝试将flink容器添加到同一个坞-组合文件中,并从YAML文件中删除“网络”条目,默认情况下在它们自己的网络上启动组。如果容器在自己的网络上并正确配置,则容器可以通过容器名称进行通信。
卡夫卡听众基本上需要匹配客户端连接的内容。因此,如果您从主机连接到"localhost: 9092 ",则9092的侦听器必须是"localhost“,以便预期的名称匹配,否则连接将失败。如果我们通过主机名"kafka“从另一个容器连接到端口29092,则需要将侦听器配置为"kafka:29092",以此类推。
如果所有这些都不起作用,那么发布一些Kafka服务器容器的日志,我们就从那里开始。
https://stackoverflow.com/questions/69126331
复制相似问题