首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink Table to DataStream:如何访问列名?

Flink Table to DataStream:如何访问列名?
EN

Stack Overflow用户
提问于 2021-03-30 21:47:30
回答 1查看 151关注 0票数 0

我想使用Flink SQL将Kafka主题消费到表中,然后将其转换回DataStream。

这是SOURCE_DDL

代码语言:javascript
复制
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)

使用Flink,我执行DDL。

代码语言:javascript
复制
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")

然后,我将其转换为DataStream,并在map(e => ...)部分中执行下游逻辑。

代码语言:javascript
复制
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)

map(e => ...)部件中,我想访问列名,在本例中为last_5_clicks。为什么?因为我可能有不同的源代码和不同的列名(比如last_10min_page_view),但是我希望在map(e => ...)中重用这些代码。

有没有办法做到这一点?谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-31 19:11:03

从Flink 1.12开始,可以通过Table.getSchema.getFieldNames访问它。从1.13版本开始,可以通过Row.getFieldNames访问。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66872184

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档