我们正试图从DB连接器(upsert )表中加入。与“卡夫卡”事件的来源,以丰富这一事件的关键与现有的cdc数据。kafka源(id,B,C) + cdc (id,D,E,F) =结果(id,B,C,D,E,F)进入kafka接收器(附加)
INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id)
SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source
JOIN mongodb_source ON source.device_id = mongodb_source._id问题是,这只有当我们的卡夫卡水槽是“插入-卡夫卡”时才能奏效。但这在DB中删除时创建了墓碑。我们需要的只是简单的事件,而不是变化。但是我们不能只使用“kafka”接收器,因为db连接器是向上插入的,所以不兼容.
怎样才能做到这一点?将插入转换为附加事件?
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
# use blink table planner
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
ddl = """CREATE TABLE sink (
`zapatos` INT,
`naranjas` STRING,
`account_id` STRING,
`user_id` STRING,
`device_id` STRING,
`time28` INT,
PRIMARY KEY (device_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'as-test-output-flink-topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'testGroup',
'key.format' = 'raw',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)
"""
st_env.sql_update(ddl)
ddl = """CREATE TABLE source (
`device_id` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`event_type` STRING,
`payload` ROW<`zapatos` INT, `naranjas` STRING, `time28` INT, `device_id` STRING>,
`trace_id` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'as-test-input-flink-topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'testGroup',
'key.format' = 'raw',
'key.fields' = 'device_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)
"""
st_env.sql_update(ddl)
ddl = """
CREATE TABLE mongodb_source (
`_id` STRING PRIMARY KEY,
`account_id` STRING,
`user_id` STRING,
`device_id` STRING
) WITH (
'connector' = 'mongodb-cdc',
'uri' = '******',
'database' = '****',
'collection' = 'testflink'
)
"""
st_env.sql_update(ddl)
st_env.sql_update("""
INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id)
SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source
JOIN mongodb_source ON source.device_id = mongodb_source._id
""")
# execute
st_env.execute("kafka_to_kafka")不要介意Mongo连接器,它是新的,但作为mysql-cdc或postgre工作。
谢谢你的帮忙!
发布于 2022-02-26 23:54:24
您试过使用左联接而不是联接吗?它不应该创造墓碑,那么如果你的目的只是充实卡夫卡事件,如果有任何来自蒙戈…
https://stackoverflow.com/questions/68376468
复制相似问题