首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在python中(单元)测试apache-beam中的流管道?

如何在python中(单元)测试apache-beam中的流管道?
EN

Stack Overflow用户
提问于 2019-06-18 18:53:46
回答 1查看 723关注 0票数 0

我写了一些流水线(从Pub/Sub开始),我想给它添加一些窗口机制。现在我想以适当的方式来测试它,那么如何创建一些“虚拟”流呢?

我的代码:

代码语言:javascript
复制
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=pipeline_options, runner=DirectRunner())
xmls_beam = beam.Create(xmls)
x = p | xmls_beam | beam.FlatMap(process_xmls) | beam.ParDo(FilterTI()) | beam.WindowInto(window.FixedWindows(200)) | beam.GroupByKey()
result = p.run()
result.wait_until_finish()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-19 08:43:06

您可以使用TimestampedValue的PCollection来模拟“虚拟流”。

例如,如果您的输入是:

代码语言:javascript
复制
    l = [window.TimestampedValue('a', 100), window.TimestampedValue('b', 300)]
    pc = p | beam.Create(l) | ...

在你的例子中(宽度为200的固定窗口),你可以预期输出元素'a‘落入第一个窗口,而'b’落入第二个窗口。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56647500

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档