我正在尝试开发数据流管道,使用有界的滑动窗口和使用的流数据集。管道如下:
样本数据:
data = [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1},
{'serverID': 'server_1', 'CPU_Utilization': 1, 'timestamp': 2},
{'serverID': 'server_1', 'CPU_Utilization': 2, 'timestamp': 3},
{'serverID': 'server_1', 'CPU_Utilization': 3, 'timestamp': 4}]横梁管道:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
with beam.Pipeline(options=pipeline_options) as p:
events = (p | 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x,
x['timestamp'])) \
| 'PairWithOne' >> beam.Map(lambda x: (None, x)) \
|'Sliding Window'>>beam.WindowInto(beam.window.SlidingWindows(3,1))
| 'Group by key' >> beam.GroupByKey()
|beam.Map(print))我得到的输出:
(None, [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1}, {'serverID':
'server_1', 'CPU_Utilization': 1, 'timestamp': 2}, {'serverID': 'server_1',
'CPU_Utilization': 2, 'timestamp': 3}])
(None, [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1}, {'serverID':
'server_1', 'CPU_Utilization': 1, 'timestamp': 2}])
(None, [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1}])
(None, [{'serverID': 'server_1', 'CPU_Utilization': 1, 'timestamp': 2}, {'serverID':
'server_1', 'CPU_Utilization': 2, 'timestamp': 3}, {'serverID': 'server_1',
'CPU_Utilization': 3, 'timestamp': 4}])
(None, [{'serverID': 'server_1', 'CPU_Utilization': 2, 'timestamp': 3}, {'serverID':
'server_1', 'CPU_Utilization': 3, 'timestamp': 4}])
(None, [{'serverID': 'server_1', 'CPU_Utilization': 3, 'timestamp': 4}])预期输出(即在启动窗口时间戳时丢弃数据,小于数据中第1行或元素的时间戳):
(None, [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1}, {'serverID':
'server_1', 'CPU_Utilization': 1, 'timestamp': 2}, {'serverID': 'server_1',
'CPU_Utilization': 2, 'timestamp': 3}])
(None, [{'serverID': 'server_1', 'CPU_Utilization': 1, 'timestamp': 2}, {'serverID':
'server_1', 'CPU_Utilization': 2, 'timestamp': 3}, {'serverID': 'server_1',
'CPU_Utilization': 3, 'timestamp': 4}])
(None, [{'serverID': 'server_1', 'CPU_Utilization': 2, 'timestamp': 3}, {'serverID':
'server_1', 'CPU_Utilization': 3, 'timestamp': 4}])
(None, [{'serverID': 'server_1', 'CPU_Utilization': 3, 'timestamp': 4}])我也尝试过AfterCount( n )触发器,但当数据点数小于n时,此触发器不考虑数据。
如果能在这方面提供任何帮助,我们将不胜感激。
发布于 2021-06-14 19:57:17
它可能是按预期工作的。您能否尝试通过将输出写入文件而不是使用打印来验证输出?可能会有多个窗格被触发和打印,但是accumulation_mode正在丢弃。
此外,您还可以尝试在调试流管道时使用波束记事本。使用类似show(pcoll, include_window_info=True)的东西来可视化事件时间、窗口和PCollection的窗格信息,而不是打印它们。
https://stackoverflow.com/questions/67975766
复制相似问题