参考项目:Watching for new files matching a filepattern in Apache Beam
你能在简单的用例中使用它吗?我的用例是让用户将数据上传到云存储-> Pipeline (Process csv to json) -> Big Query。我知道云存储是有界的集合,所以它代表了批量数据流。
我想要做的是保持流水线在流模式下运行,一旦文件上传到云存储,它就会通过流水线进行处理。使用watchfornewfiles可以做到这一点吗?
我写的代码如下:
p.apply(TextIO.read().from("<bucketname>")
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.<String>never()));没有内容被转发到Big Query,但是管道显示它是流的。
发布于 2018-07-25 09:00:50
你可以在这里使用谷歌云存储触发器:https://cloud.google.com/functions/docs/calling/storage#functions-calling-storage-python
这些触发器使用类似于云发布/订阅的云函数,如果对象是:创建/删除/存档/或元数据更改,则会在对象上触发。
这些事件使用云存储的发布/订阅通知方式发送,但请注意不要在同一存储桶上设置太多函数,因为存在一定的通知限制。
此外,在文档的末尾有一个到示例实现的链接。
https://stackoverflow.com/questions/50688639
复制相似问题