首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink SQL单元测试:如何分配水印?

Flink SQL单元测试:如何分配水印?
EN

Stack Overflow用户
提问于 2020-09-23 21:40:01
回答 1查看 758关注 0票数 0

我正在为使用match_recognize的Flink SQL语句编写单元测试。我正在设置这样的测试数据

代码语言:javascript
复制
Table data = tEnv.fromValues(DataTypes.ROW(
  DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
  DataTypes.FIELD("foobar", DataTypes.STRING()),
  ....
  ),
  row(...),
  row(...)
);

我有两个问题,

  • 如何将event_time指定为水印域?(表示行时)
  • 不重要的是,给表创建一个有意义的名称?

FLINK版本: 1.11

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-09-28 08:34:43

您遇到了Table的当前限制:不可能结合forValues方法定义水印和行时属性;您需要一个连接器。有几种方法可以解决这一问题:

1.使用与VALUES堆在一起的csv连接器,如这个例子中所示。

2.使用内置DataGen连接器.由于您正在为CEP设置一个单元测试,我认为您希望对生成的数据进行某种程度的控制,因此这可能不是一个可行的选择。不管怎样,我想我该提一提。

注意:使用SQL语法的是从Flink 1.10创建表的推荐方法。这将使您想要做的两件事(即定义水印和命名表)变得更加简单:

代码语言:javascript
复制
tEnv.executeSql("CREATE TABLE table_name (\n" +
                "             event_time TIMESTAMP(3),\n" +
                "             foobar STRING \n" +
                "             WATERMARK FOR event_time AS event_time\n" +
                ") WITH (...)"
);

Table data = tEnv.from("table_name");

水印声明为计算列,可以选择使用多个水印策略。有关更多细节,请查看此文档页

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64036629

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档