我们使用flink从一些IoT传感器生成事件。每个传感器都可以用来产生不同类型的事件(如温度、湿度等)。一对多的比率(传感器->启用事件)。
存储在关系数据库中的传感器和启用事件之间的映射
为了丰富传感器数据,我们将传感器数据和表API连接起来。只需添加带有启用事件列表的元数据。
因此,如果某些特定的sensor-123只启用了TEMP和PRESSURE事件,如何仅将传感器数据发送到这两个已定义的进程函数?
人们会想到以下几点:
val enriched: DataStream[EnrichedSensorData] = ...
val temp = enriched.filter(x => isTempEnabled(x)).process(....)
val humd = enriched.filter(x => isHumdEnabled(x)).process(....)
val press = enriched.filter(x => isPressEnabled(x)).process(....),
。

发布于 2022-01-26 23:02:21
https://stackoverflow.com/questions/70853380
复制相似问题