Usecase:从Kafka读取protobuf消息,反序列化它们,应用一些转换(平放一些列),然后写到dynamodb。
不幸的是,Kafka Flink连接器只支持- csv、json和avro格式。因此,我不得不使用更低级别的API (datastream)。
问题:--如果我可以用datastream对象创建一个表,那么我就可以接受在该表上运行的查询。这将使转换部分无缝和通用。可以在datastream对象上运行SQL查询吗?
发布于 2020-04-28 19:40:39
如果您有一个对象的DataStream,那么您可以使用StreamTableEnvironment简单地将给定的DataStream注册为表。
这看起来或多或少如下所示:
val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])然后,您应该能够查询从DataStream创建的动态表。
https://stackoverflow.com/questions/61485484
复制相似问题