首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache梁-从第一个元素开始滑动窗口

Apache梁-从第一个元素开始滑动窗口
EN

Stack Overflow用户
提问于 2021-06-14 18:55:55
回答 1查看 552关注 0票数 0

我正在尝试开发数据流管道,使用有界的滑动窗口和使用的流数据集。管道如下:

  • 读取数据
  • 分配时间戳
  • 使用3个大小和1个周期的SlidingWindows()窗口
  • 对窗口中的元素进行分组
  • 打印输出

样本数据:

代码语言:javascript
复制
  data = [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1},
          {'serverID': 'server_1', 'CPU_Utilization': 1, 'timestamp': 2},
          {'serverID': 'server_1', 'CPU_Utilization': 2, 'timestamp': 3},
          {'serverID': 'server_1', 'CPU_Utilization': 3, 'timestamp': 4}]

横梁管道:

代码语言:javascript
复制
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions

    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True 

    p = beam.Pipeline(options=pipeline_options)

    with beam.Pipeline(options=pipeline_options) as p:
         events =  (p | 'Create Events' >> beam.Create(data) \
                      | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, 
                                              x['timestamp'])) \
                      | 'PairWithOne' >> beam.Map(lambda x: (None, x)) \
                      |'Sliding Window'>>beam.WindowInto(beam.window.SlidingWindows(3,1))
                      | 'Group by key' >> beam.GroupByKey()
                      |beam.Map(print))

我得到的输出:

代码语言:javascript
复制
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1}, {'serverID': 
    'server_1', 'CPU_Utilization': 1, 'timestamp': 2}, {'serverID': 'server_1', 
    'CPU_Utilization': 2, 'timestamp': 3}])
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1}, {'serverID': 
    'server_1', 'CPU_Utilization': 1, 'timestamp': 2}])
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1}])
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 1, 'timestamp': 2}, {'serverID': 
    'server_1', 'CPU_Utilization': 2, 'timestamp': 3}, {'serverID': 'server_1', 
    'CPU_Utilization': 3, 'timestamp': 4}])
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 2, 'timestamp': 3}, {'serverID': 
    'server_1', 'CPU_Utilization': 3, 'timestamp': 4}])
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 3, 'timestamp': 4}])

预期输出(即在启动窗口时间戳时丢弃数据,小于数据中第1行或元素的时间戳):

代码语言:javascript
复制
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 0, 'timestamp': 1}, {'serverID': 
    'server_1', 'CPU_Utilization': 1, 'timestamp': 2}, {'serverID': 'server_1', 
    'CPU_Utilization': 2, 'timestamp': 3}])
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 1, 'timestamp': 2}, {'serverID': 
    'server_1', 'CPU_Utilization': 2, 'timestamp': 3}, {'serverID': 'server_1', 
    'CPU_Utilization': 3, 'timestamp': 4}])
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 2, 'timestamp': 3}, {'serverID': 
    'server_1', 'CPU_Utilization': 3, 'timestamp': 4}])
    (None, [{'serverID': 'server_1', 'CPU_Utilization': 3, 'timestamp': 4}])

我也尝试过AfterCount( n )触发器,但当数据点数小于n时,此触发器不考虑数据。

如果能在这方面提供任何帮助,我们将不胜感激。

EN

回答 1

Stack Overflow用户

发布于 2021-06-14 19:57:17

它可能是按预期工作的。您能否尝试通过将输出写入文件而不是使用打印来验证输出?可能会有多个窗格被触发和打印,但是accumulation_mode正在丢弃。

此外,您还可以尝试在调试流管道时使用波束记事本。使用类似show(pcoll, include_window_info=True)的东西来可视化事件时间、窗口和PCollection的窗格信息,而不是打印它们。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67975766

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档