当我处理一个大文件(5行million+行)时,会看到一些奇怪的东西。在代码中,我的碎片数被设置为1。然而,当我看到超过15个文件被踢出时,我不知道为什么。谷歌搜索并没有给我任何解释为什么会发生这种情况。
作为一个片段,下面是输出部分:
results.get(valid).setCoder(StringUtf8Coder.of()).apply("Build Window For Valid Entries", Window.<String>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))).withAllowedLateness(Duration.ZERO).discardingFiredPanes())
.apply("Write valid to GCS", TextIO.write().to(validFileLocation).withSuffix(".csv").withWindowedWrites().withNumShards(1));有人有什么想法吗?
发布于 2018-11-14 20:01:22
最后我弄明白了。GlobalWindows很好,但是我想要做的最好的方法是使用会话窗口。这解决了这个问题。
一个例子是Sessions.withGapDuration(Duration.standardSeconds(10))))
上面写着,“在最后一次收到数据包的10秒后,如果我没有得到任何其他信息,继续输出”。
发布于 2018-10-21 11:12:43
如果您的输入是无限制的,那么如何只编写一个文件?
TextIO必须等待无界输入的结束,或者使用窗口(边界数据)将其分割。
我不知道在GlobalWindow上触发的预期行为,也许只有最后一个文件包含所有的数据?或者你达到了GCS的大小限制?
此外,GCS不允许对文件进行修改,只能添加或删除文件。
https://stackoverflow.com/questions/52898185
复制相似问题