我正在通过pub/sub将数据摄取到一个数据流管道中,该管道正在无限制地运行。这些数据基本上与从跟踪设备中捕获的时间戳相协调。这些消息以批方式到达,每个批处理可能是1..n条消息。在某一段时间内,可能不会有任何消息到达,这可能会在稍后(或不)引起不满。我们使用每个坐标的时间戳( UTC)作为发布子消息的属性。并通过时间戳标签读取管道:
pipeline.apply(PubsubIO.Read.topic("new").timestampLabel("timestamp")坐标和延迟的示例如下:
36 points wait 0:02:24
36 points wait 0:02:55
18 points wait 0:00:45
05 points wait 0:00:01
36 points wait 0:00:33
36 points wait 0:00:43
36 points wait 0:00:34一条消息可能看起来像:
2013-07-07 09:34:11;47.798766;13.050133
在第一批之后,水印是空的,在第二批之后,我可以看到管道诊断中的水印,只是它没有被更新,尽管新的消息到达了。此外,根据堆栈驱动程序日志记录,PubSub没有未传递或未确认的消息。
水印不应该随着新事件时间的到来而向前移动吗?
根据What is the watermark heuristic for PubsubIO running on GCD?的说法,WaterMark也应该每隔2分钟前进一次,而不是吗?
。。如果我们在超过两分钟内还没有看到订阅数据(而且没有积压),我们将水印提升到接近实时的位置。。。
更新以解决Bens问题:
我们能查一下工作证件吗?
是的,我刚刚重新启动了整个设置在09:52CET,这是07:52协调世界时,工作ID 2017-05_05_00_49_11-11176509843641901704。
您使用的是什么版本的SDK?
1.9.0
如何使用时间戳标签发布消息?
我们使用python脚本发布数据,它使用pub子sdk。来自那里的消息可能看起来是:
{“数据”:{时间戳;lat;long;ele},“时间戳”:'2017-05-05T07:45:51Z'}
我们在数据流中使用时间戳属性作为时间戳标记。
水印卡在什么地方?
对于这项工作,水印现在停留在09:57:35 (我在10:10左右发布这篇文章),尽管新的数据被发送到了例如
10:05:14
10:05:43
10:06:30我还可以看到,在延迟超过10秒的情况下,我们可能会向pub子发布数据,例如,在10:07:47,我们发布最高时间戳为10:07:26的数据。
过了几个小时,水印就迎刃而解了,但我不明白为什么它会被延迟,/not在开始移动。
发布于 2017-05-05 19:40:21
这是PubSub水印跟踪逻辑中的一种边缘情况,它有两个工作范围(见下文)。本质上,如果没有输入2分钟,那么水印将前进到当前时间。但是,如果数据到达速度比每2分钟快,但仍然处于非常低的QPS,那么就没有足够的数据来保持估计的水印更新。
正如我所提到的,有几个工作是围绕着:
发布于 2018-12-30 19:32:30
为了记录在案,关于前面提到的直接运行程序上下文中的边缘情况,另一件需要记住的事情是运行程序的并行性。拥有更高的并行性,这是默认的,特别是在多核机器上,似乎需要更多的数据。在我的例子中,一个设置--targetParallelism=1提供了帮助。基本上是在没有任何其他干预的情况下,将一条卡住的管道转化为一条工作管道。
https://stackoverflow.com/questions/43779019
复制相似问题