首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink源kafka与CDC源代码连接到kafka接收器

Flink源kafka与CDC源代码连接到kafka接收器
EN

Stack Overflow用户
提问于 2021-07-14 10:35:29
回答 1查看 392关注 0票数 1

我们正试图从DB连接器(upsert )表中加入。与“卡夫卡”事件的来源,以丰富这一事件的关键与现有的cdc数据。kafka源(id,B,C) + cdc (id,D,E,F) =结果(id,B,C,D,E,F)进入kafka接收器(附加)

代码语言:javascript
复制
INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id) 
SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source 
JOIN mongodb_source ON source.device_id = mongodb_source._id

问题是,这只有当我们的卡夫卡水槽是“插入-卡夫卡”时才能奏效。但这在DB中删除时创建了墓碑。我们需要的只是简单的事件,而不是变化。但是我们不能只使用“kafka”接收器,因为db连接器是向上插入的,所以不兼容.

怎样才能做到这一点?将插入转换为附加事件?

代码语言:javascript
复制
s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)
    # use blink table planner
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())

   ddl = """CREATE TABLE sink (
            `zapatos` INT,
            `naranjas` STRING,
            `account_id` STRING,
            `user_id` STRING,
            `device_id` STRING,
            `time28` INT,
            PRIMARY KEY (device_id) NOT ENFORCED
        ) WITH (
            'connector' = 'upsert-kafka',
            'topic' = 'as-test-output-flink-topic',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'testGroup',
            'key.format' = 'raw',
            'value.format' = 'json',
            'value.fields-include' = 'EXCEPT_KEY'
        )
        """
    st_env.sql_update(ddl)
    
    ddl = """CREATE TABLE source (
            `device_id` STRING,
            `timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
            `event_type` STRING,
            `payload` ROW<`zapatos` INT, `naranjas` STRING, `time28` INT, `device_id` STRING>,
            `trace_id` STRING
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'as-test-input-flink-topic',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'testGroup',
            'key.format' = 'raw',
            'key.fields' = 'device_id',
            'value.format' = 'json',
            'value.fields-include' = 'EXCEPT_KEY'
        )
        """
    st_env.sql_update(ddl)
    
    ddl = """
    CREATE TABLE mongodb_source (
        `_id` STRING PRIMARY KEY, 
        `account_id` STRING,
        `user_id` STRING,
        `device_id` STRING
    ) WITH (
        'connector' = 'mongodb-cdc',
        'uri' = '******',
        'database' = '****',
        'collection' = 'testflink'
    )
    """
    st_env.sql_update(ddl)


    st_env.sql_update("""
        INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id) 
        SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source 
        JOIN mongodb_source ON source.device_id = mongodb_source._id
     """)

# execute
    st_env.execute("kafka_to_kafka")

不要介意Mongo连接器,它是新的,但作为mysql-cdc或postgre工作。

谢谢你的帮忙!

EN

回答 1

Stack Overflow用户

发布于 2022-02-26 23:54:24

您试过使用左联接而不是联接吗?它不应该创造墓碑,那么如果你的目的只是充实卡夫卡事件,如果有任何来自蒙戈…

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68376468

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档