首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Apache Flink中使用Debezium连接器

如何在Apache Flink中使用Debezium连接器
EN

Stack Overflow用户
提问于 2020-11-21 05:19:19
回答 1查看 561关注 0票数 0

我正在尝试使用flink的table API创建一个使用Debezium源函数的表,我在https://github.com/ververica/flink-cdc-connectors中找到了这些函数的一个实现,并在我的代码中像这样使用它们:

代码语言:javascript
复制
val debeziumProperties = new Properties()
  debeziumProperties.setProperty("plugin.name", "wal2json")
  debeziumProperties.setProperty("format", "debezium-json")

  val sourceFunction: DebeziumSourceFunction[TestCharge] = PostgreSQLSource.builder()
    .hostname("******")
    .port(5432)
    .database("*****") // monitor all tables under inventory database
    .username("*****")
    .password("*****")
    .debeziumProperties(debeziumProperties)
    .deserializer(new CustomDebeziumDeserializer) // converts SourceRecord to String
    .build()

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val sTableEnv = StreamTableEnvironment.create(env, sSettings)

  val cdcStream: DataStream[TestCharge] = env
    .addSource(sourceFunction)
    .map(x => x)

  sTableEnv.createTemporaryView("historic", cdcStream, 'chargeId, 'email, 'amount, 'cardHash)
  val table: Table = sTableEnv.sqlQuery("SELECT SUM(amount) FROM historic GROUP BY chargeId")

  val reverse = sTableEnv.toRetractStream[Row](table)

  reverse.print()

我还按照文档中的描述添加了这个依赖项:

代码语言:javascript
复制
"com.alibaba.ververica" % "flink-sql-connector-postgres-cdc" % "1.1.0"

当我尝试在迷你集群上本地运行我的作业时,它工作得很好,但是在Kubernetes上配置的Flink集群中,它给了我这个异常:

代码语言:javascript
复制
Caused by: io.debezium.DebeziumException: No implementation of Debezium engine builder was found

有没有人知道可能会发生什么,或者我是否遗漏了一些依赖?

提前谢谢。

EN

回答 1

Stack Overflow用户

发布于 2020-11-21 16:33:32

如果想在TableAPI/SQL中使用它,只需使用SQL注册表即可。

代码语言:javascript
复制
sTableEnv.executeSql(
      """
        |CREATE TABLE shipments (
        |  shipment_id INT,
        |  order_id INT,
        |  origin STRING,
        |  destination STRING,
        |  is_arrived BOOLEAN
        |) WITH (
        |  'connector' = 'postgres-cdc',
        |  'hostname' = 'localhost',
        |  'port' = '5432',
        |  'username' = 'postgres',
        |  'password' = 'postgres',
        |  'database-name' = 'postgres',
        |  'schema-name' = 'public',
        |  'table-name' = 'shipments'
        |)
        |""".stripMargin)
// then you can query the table
  val table: Table = sTableEnv.sqlQuery("SELECT SUM(shipment_id) FROM shipments GROUP BY order_id")

这是使用CDC源代码的最简单方法。因为目前Table不支持将changelog流转换为Table

关于你的问题,我认为这可能是因为依赖冲突。请检查您是否依赖于其他版本的<artifactId>debezium-embedded</artifactId>。如果是,请将其删除。flink-sql-connector-postgres-cdc已经将其打包到版本1.12中。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64937067

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档