首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka分布式连接产生重复消息

Kafka分布式连接产生重复消息
EN

Stack Overflow用户
提问于 2022-06-26 07:42:21
回答 1查看 48关注 0票数 1

运行环境

(confluent-7.1.0)

  • One
  • 三服务器
  • 三位卡夫卡代理、连接、模式注册
  • ftp连接器用于测试(3项任务)

问题

  • 连接会产生重复的消息。但是,我希望ftp连接器能够在每个文件中发出一条消息.

分布式连接生成相同的消息三次(每个连接任务有一条消息)

下面是当连接器任务生成message.

  • This日志时打印的
  • ,每个连接进程日志

代码语言:javascript
复制
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] poll (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:77)
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] connect 10.0.0.138:None (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:294)
[2022-06-26 15:23:12,862] INFO [ftp-test-conn|task-0] successfully connected to the ftp server and logged in (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:311)
[2022-06-26 15:23:12,863] INFO [ftp-test-conn|task-0] passive we are (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:318)
[2022-06-26 15:23:12,870] INFO [ftp-test-conn|task-0] Found 4 items in /home/smheo/ftp-dir/* (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:245)
[2022-06-26 15:23:12,877] INFO [ftp-test-conn|task-0] meta store storage HASN'T /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore:48)
[2022-06-26 15:23:12,878] INFO [ftp-test-conn|task-0] fetching /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:102)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] fetched /home/smheo/ftp-dir/msg-4, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:218)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] dump entire /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:219)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] got some fileChanges: /home/smheo/ftp-dir/msg-4, offset = -1 (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:96)

消费者使用相同的信息

代码语言:javascript
复制
(base) ubuntu@ubuntu:~/distributed-pipeline/confluent-7.1.0$ ./bin/kafka-console-consumer --bootstrap-server <BROKER_IP>:9092 --topic default-topic-1

hello

hello

hello

FTP连接器

代码语言:javascript
复制
{
    "ftp-test-conn": {
        "info": {
            "name": "ftp-test-conn",
            "config": {
                "connector.class": "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector",
                "connect.ftp.address": "<FTP HOST IP>",
                "connect.ftp.keystyle": "string",
                "compression.type": "gzip",
                "connect.ftp.user": "ftpusername",
                "connect.ftp.refresh": "PT1M",
                "tasks.max": "3",
                "connect.ftp.file.maxage": "P7D",
                "name": "ftp-test-conn",
                "connect.ftp.monitor.update": "/home/username/ftp-dir/:default-topic-1",
                "connect.ftp.timeout": "3000000",
                "connect.ftp.password": "<PASSWORD>"
            },
            "tasks": [
                {
                    "connector": "ftp-test-conn",
                    "task": 0
                },
                {
                    "connector": "ftp-test-conn",
                    "task": 1
                },
                {
                    "connector": "ftp-test-conn",
                    "task": 2
                }
            ],
            "type": "source"
        },
        "status": {
            "name": "ftp-test-conn",
            "connector": {
                "state": "RUNNING",
                "worker_id": "<BROKER 1 IP>:8083"
            },
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 1 IP>:8083"
                },
                {
                    "id": 1,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 2 IP>:8083"
                },
                {
                    "id": 2,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 3 IP>:8083"
                }
            ],
            "type": "source"
        }
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-26 13:24:20

每个任务都很可能读取相同的文件。只尝试设置tasks.max=1。更具体地说,FTP客户端之间没有文件系统锁定(每个任务都启动自己的连接),因此您将只限于一个读取器任务。

仔细查看日志,您可以在[ftp-test-conn|task-0]中看到任务ID

此外,不建议在与代理相同的主机上运行Connect。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72759729

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档