运行环境
(confluent-7.1.0)
问题
分布式连接生成相同的消息三次(每个连接任务有一条消息)
下面是当连接器任务生成message.
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] poll (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:77)
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] connect 10.0.0.138:None (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:294)
[2022-06-26 15:23:12,862] INFO [ftp-test-conn|task-0] successfully connected to the ftp server and logged in (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:311)
[2022-06-26 15:23:12,863] INFO [ftp-test-conn|task-0] passive we are (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:318)
[2022-06-26 15:23:12,870] INFO [ftp-test-conn|task-0] Found 4 items in /home/smheo/ftp-dir/* (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:245)
[2022-06-26 15:23:12,877] INFO [ftp-test-conn|task-0] meta store storage HASN'T /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore:48)
[2022-06-26 15:23:12,878] INFO [ftp-test-conn|task-0] fetching /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:102)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] fetched /home/smheo/ftp-dir/msg-4, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:218)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] dump entire /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:219)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] got some fileChanges: /home/smheo/ftp-dir/msg-4, offset = -1 (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:96)消费者使用相同的信息
(base) ubuntu@ubuntu:~/distributed-pipeline/confluent-7.1.0$ ./bin/kafka-console-consumer --bootstrap-server <BROKER_IP>:9092 --topic default-topic-1
hello
hello
helloFTP连接器
{
"ftp-test-conn": {
"info": {
"name": "ftp-test-conn",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector",
"connect.ftp.address": "<FTP HOST IP>",
"connect.ftp.keystyle": "string",
"compression.type": "gzip",
"connect.ftp.user": "ftpusername",
"connect.ftp.refresh": "PT1M",
"tasks.max": "3",
"connect.ftp.file.maxage": "P7D",
"name": "ftp-test-conn",
"connect.ftp.monitor.update": "/home/username/ftp-dir/:default-topic-1",
"connect.ftp.timeout": "3000000",
"connect.ftp.password": "<PASSWORD>"
},
"tasks": [
{
"connector": "ftp-test-conn",
"task": 0
},
{
"connector": "ftp-test-conn",
"task": 1
},
{
"connector": "ftp-test-conn",
"task": 2
}
],
"type": "source"
},
"status": {
"name": "ftp-test-conn",
"connector": {
"state": "RUNNING",
"worker_id": "<BROKER 1 IP>:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "<BROKER 1 IP>:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "<BROKER 2 IP>:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "<BROKER 3 IP>:8083"
}
],
"type": "source"
}
}
}发布于 2022-06-26 13:24:20
每个任务都很可能读取相同的文件。只尝试设置tasks.max=1。更具体地说,FTP客户端之间没有文件系统锁定(每个任务都启动自己的连接),因此您将只限于一个读取器任务。
仔细查看日志,您可以在[ftp-test-conn|task-0]中看到任务ID
此外,不建议在与代理相同的主机上运行Connect。
https://stackoverflow.com/questions/72759729
复制相似问题