我想使用Flink SQL将Kafka主题消费到表中,然后将其转换回DataStream。
这是SOURCE_DDL
CREATE TABLE kafka_source (
user_id BIGINT,
datetime TIMESTAMP(3),
last_5_clicks STRING
) WITH (
'connector' = 'kafka',
'topic' = 'aiinfra.fct.userfeature.0',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test-group',
'format' = 'json'
)使用Flink,我执行DDL。
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")然后,我将其转换为DataStream,并在map(e => ...)部分中执行下游逻辑。
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)在map(e => ...)部件中,我想访问列名,在本例中为last_5_clicks。为什么?因为我可能有不同的源代码和不同的列名(比如last_10min_page_view),但是我希望在map(e => ...)中重用这些代码。
有没有办法做到这一点?谢谢。
发布于 2021-03-31 19:11:03
从Flink 1.12开始,可以通过Table.getSchema.getFieldNames访问它。从1.13版本开始,可以通过Row.getFieldNames访问。
https://stackoverflow.com/questions/66872184
复制相似问题