我相信这只是一个很好的机会,我可以学到更多-我期待着答案-谢谢提前!
我一直在玩time (),试图确定我的反应流的哪一部分需要很长时间(不确定这是最好的方式还是写我自己的订阅者--但我总是先从真正聪明的人那里寻找现有的解决方案)。
我看到了一些非常有趣的东西。下面是一个简单的流程:
Flux.range(0, 10)
.log("range")
.map(String::valueOf)
.timed()
.log("map")
.map(t -> "value-" + t.get())
.timed()
.log("concat")
.subscribe();如果我正确地理解了这一点,我希望"map“日志将显示
的onNext(s)之间的间隔时间
然后,"concat“日志将对第二个map操作符显示相同的内容,其中应该包括执行第一个映射操作所需的时间(加上一些)。
这个场景不涉及线程处理(虽然我承认它可以,而且按照规范它也是可以的),而且日志似乎表明range->map->map对于onNext操作是一个线性流。
在调用下一个onNext()之前,整个流程必须完成--我正在计时的操作符的上游和下游--因此,像我在这里所做的那样,多重时间()并不是很好,而且可能不适合尝试寻找昂贵的“子流”。在没有循环的地方--比如处理web请求流时,timed()是有用的。
我不能使用通用度量来排除真正的问题,因为我需要查看特定条件下的行为(我需要一个跟踪广度,它会触发每个操作符都有自己的span的流的谓词)。
但是有了这个特殊的解决方案,我看到了一些我没有想到的东西。
看看这些日志,似乎有一些奇怪的持续时间。似乎每一个其他的(或类似的) onNext(),"concat“操作的持续时间都比"map”操作短。参见值1、3、6的日志条目.
用于获得纳米时间的调度器不会改变。订阅者的行为似乎和预期的一样。我一定是忽略了一些东西还有一些很酷的东西要学。
看着FluxTimed和FluxMapFuseable并不会马上给我带来答案(但我会继续寻找)。还是简单的说,有时候“绕着”转到下一个range->map实际上比其他时候要快得多呢?
有什么想法吗?除了这个奇怪的地方,其他的一切似乎都像预期的那样正常工作。
来自提供的测试的日志(加上我在FluxTimed中添加的一些附加日志记录)如下:
{"@timestamp":"2022-01-13T20:33:43.61Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_SUBSCRIBE","arg1":"[Fuseable] FluxOnAssembly.OnAssemblySubscriber","message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)"}
{"@timestamp":"2022-01-13T20:33:43.63Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_SUBSCRIBE","arg1":"[Fuseable] FluxOnAssembly.OnAssemblySubscriber","message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)"}
{"@timestamp":"2022-01-13T20:33:43.63Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_SUBSCRIBE","arg1":"[Fuseable] FluxOnAssembly.OnAssemblySubscriber","message":"| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)"}
{"@timestamp":"2022-01-13T20:33:43.631Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"REQUEST","arg1":"unbounded","message":"| request(unbounded)"}
{"@timestamp":"2022-01-13T20:33:43.631Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"REQUEST","arg1":"unbounded","message":"| request(unbounded)"}
{"@timestamp":"2022-01-13T20:33:43.632Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"REQUEST","arg1":"unbounded","message":"| request(unbounded)"}
{"@timestamp":"2022-01-13T20:33:43.632Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":0,"message":"| onNext(0)"}
Current thread is: Thread[main,5,main] for value 0
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684449908952 for value 0
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684454327472 for value 0
{"@timestamp":"2022-01-13T20:33:43.635Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(0){eventElapsedNanos=4418520, eventElapsedSinceSubscriptionNanos=4418520, eventTimestampEpochMillis=1642106023635})"}
Current thread is: Thread[main,5,main] for value value-0
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684450236435 for value value-0
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684469141337 for value value-0
{"@timestamp":"2022-01-13T20:33:43.649Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-0){eventElapsedNanos=18904902, eventElapsedSinceSubscriptionNanos=18904902, eventTimestampEpochMillis=1642106023649})"}
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":1,"message":"| onNext(1)"}
Current thread is: Thread[main,5,main] for value 1
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684454327472 for value 1
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684469639473 for value 1
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(1){eventElapsedNanos=15312001, eventElapsedSinceSubscriptionNanos=19730521, eventTimestampEpochMillis=1642106023650})"}
Current thread is: Thread[main,5,main] for value value-1
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684469141337 for value value-1
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684469857807 for value value-1
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-1){eventElapsedNanos=716470, eventElapsedSinceSubscriptionNanos=19621372, eventTimestampEpochMillis=1642106023650})"}
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":2,"message":"| onNext(2)"}
Current thread is: Thread[main,5,main] for value 2
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684469639473 for value 2
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684470234342 for value 2
{"@timestamp":"2022-01-13T20:33:43.65Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(2){eventElapsedNanos=594869, eventElapsedSinceSubscriptionNanos=20325390, eventTimestampEpochMillis=1642106023650})"}
Current thread is: Thread[main,5,main] for value value-2
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684469857807 for value value-2
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684470457218 for value value-2
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-2){eventElapsedNanos=599411, eventElapsedSinceSubscriptionNanos=20220783, eventTimestampEpochMillis=1642106023651})"}
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":3,"message":"| onNext(3)"}
Current thread is: Thread[main,5,main] for value 3
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684470234342 for value 3
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684470833983 for value 3
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(3){eventElapsedNanos=599641, eventElapsedSinceSubscriptionNanos=20925031, eventTimestampEpochMillis=1642106023651})"}
Current thread is: Thread[main,5,main] for value value-3
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684470457218 for value value-3
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684471033203 for value value-3
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-3){eventElapsedNanos=575985, eventElapsedSinceSubscriptionNanos=20796768, eventTimestampEpochMillis=1642106023651})"}
{"@timestamp":"2022-01-13T20:33:43.651Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":4,"message":"| onNext(4)"}
Current thread is: Thread[main,5,main] for value 4
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684470833983 for value 4
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684471372015 for value 4
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(4){eventElapsedNanos=538032, eventElapsedSinceSubscriptionNanos=21463063, eventTimestampEpochMillis=1642106023652})"}
Current thread is: Thread[main,5,main] for value value-4
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684471033203 for value value-4
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684471576037 for value value-4
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-4){eventElapsedNanos=542834, eventElapsedSinceSubscriptionNanos=21339602, eventTimestampEpochMillis=1642106023652})"}
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":5,"message":"| onNext(5)"}
Current thread is: Thread[main,5,main] for value 5
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684471372015 for value 5
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684471912756 for value 5
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(5){eventElapsedNanos=540741, eventElapsedSinceSubscriptionNanos=22003804, eventTimestampEpochMillis=1642106023652})"}
Current thread is: Thread[main,5,main] for value value-5
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684471576037 for value value-5
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684472149299 for value value-5
{"@timestamp":"2022-01-13T20:33:43.652Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-5){eventElapsedNanos=573262, eventElapsedSinceSubscriptionNanos=21912864, eventTimestampEpochMillis=1642106023652})"}
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":6,"message":"| onNext(6)"}
Current thread is: Thread[main,5,main] for value 6
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684471912756 for value 6
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684472505546 for value 6
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(6){eventElapsedNanos=592790, eventElapsedSinceSubscriptionNanos=22596594, eventTimestampEpochMillis=1642106023653})"}
Current thread is: Thread[main,5,main] for value value-6
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684472149299 for value value-6
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684472720603 for value value-6
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-6){eventElapsedNanos=571304, eventElapsedSinceSubscriptionNanos=22484168, eventTimestampEpochMillis=1642106023653})"}
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":7,"message":"| onNext(7)"}
Current thread is: Thread[main,5,main] for value 7
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684472505546 for value 7
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684473196354 for value 7
{"@timestamp":"2022-01-13T20:33:43.653Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(7){eventElapsedNanos=690808, eventElapsedSinceSubscriptionNanos=23287402, eventTimestampEpochMillis=1642106023653})"}
Current thread is: Thread[main,5,main] for value value-7
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684472720603 for value value-7
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684473426400 for value value-7
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-7){eventElapsedNanos=705797, eventElapsedSinceSubscriptionNanos=23189965, eventTimestampEpochMillis=1642106023654})"}
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":8,"message":"| onNext(8)"}
Current thread is: Thread[main,5,main] for value 8
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684473196354 for value 8
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684473798044 for value 8
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(8){eventElapsedNanos=601690, eventElapsedSinceSubscriptionNanos=23889092, eventTimestampEpochMillis=1642106023654})"}
Current thread is: Thread[main,5,main] for value value-8
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684473426400 for value value-8
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684474036903 for value value-8
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-8){eventElapsedNanos=610503, eventElapsedSinceSubscriptionNanos=23800468, eventTimestampEpochMillis=1642106023654})"}
{"@timestamp":"2022-01-13T20:33:43.654Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_NEXT","arg1":9,"message":"| onNext(9)"}
Current thread is: Thread[main,5,main] for value 9
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 lastEventNanos is 99684473798044 for value 9
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@113d0f75 clock 1309713184 has now time of 99684474393764 for value 9
{"@timestamp":"2022-01-13T20:33:43.655Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(9){eventElapsedNanos=595720, eventElapsedSinceSubscriptionNanos=24484812, eventTimestampEpochMillis=1642106023655})"}
Current thread is: Thread[main,5,main] for value value-9
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 lastEventNanos is 99684474036903 for value value-9
For subscriber reactor.core.publisher.FluxTimed$TimedSubscriber@7037a680 clock 1309713184 has now time of 99684474594071 for value value-9
{"@timestamp":"2022-01-13T20:33:43.655Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_NEXT","arg1":{},"message":"| onNext(Timed(value-9){eventElapsedNanos=557168, eventElapsedSinceSubscriptionNanos=24357636, eventTimestampEpochMillis=1642106023655})"}
{"@timestamp":"2022-01-13T20:33:43.656Z","thread_name":"main","level":"INFO","logger_name":"range","arg0":"ON_COMPLETE","arg1":"","message":"| onComplete()"}
{"@timestamp":"2022-01-13T20:33:43.656Z","thread_name":"main","level":"INFO","logger_name":"map","arg0":"ON_COMPLETE","arg1":"","message":"| onComplete()"}
{"@timestamp":"2022-01-13T20:33:43.656Z","thread_name":"main","level":"INFO","logger_name":"concat","arg0":"ON_COMPLETE","arg1":"","message":"| onComplete()"}发布于 2022-01-25 16:31:03
如果流不是迭代的,那么timed()的使用效果很好。当流是迭代的--就像在示例中一样,那么您需要考虑“机制”,即测量的持续时间是信号之间的时间。
https://stackoverflow.com/questions/70703233
复制相似问题