自然的。智能批处理是流处理中的一种技术,它在不影响延迟的情况下优化吞吐量。在并发队列的示例中,使用者能够在某一时刻原子地清除所有观察到的项,然后将它们作为批处理处理。理想情况下,队列应该是有界的,对批处理大小有一个上限,同时向发件人提供背压。
它被称为“自然”批处理,因为没有强制的批处理大小:当流量低时,它会在到达时立即处理每个项目。在这种情况下,您不需要通过批处理项来进行任何吞吐量优化。当流量增加时,使用者将自动开始处理更大的批,摊销单个操作(如数据库INSERT )的固定延迟。
我编写了一些实现基本目标的代码:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
const val batchLimit = 20
@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
handleItems: (List<T>) -> Unit
) {
val buf = mutableListOf<T>()
while (true) {
receiveOrNull()?.also { buf.add(it) } ?: break
for (x in 2..batchLimit) {
poll()?.also { buf.add(it) } ?: break
}
handleItems(buf)
buf.clear()
}
}我们可以用这个测试它:
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
val chan = generateMockTraffic()
runBlocking {
chan.consumeBatched { println("Received items: $it") }
}
}
@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
return GlobalScope.produce(capacity = batchLimit) {
(1..100).forEach {
send(it)
if (it % 10 == 0) {
delay(1)
}
}
}
}consumeBatched()一次轮询队列一个项目,因此必须附加一个批处理限制。如果针对一个并发队列(如支持drain操作的Agrona项目的drain)编写,则会更理想。
是否有更好的方法与Kotlin渠道,更多的支持,从图书馆?
如果没有,这是否被认为是一个需要添加的特性?
发布于 2018-12-18 10:25:32
是否有更好的方法与Kotlin渠道,更多的支持,从图书馆?
库不支持此功能。
如果没有,这是否被认为是一个需要添加的特性?
它取决于所需的API表面。drain成员不太可能适合于通道语义:它约束实现,它应该以某种方式公开排水限制,并给通道提供更多“类似集合”的API。例如,drain应该如何在无限通道中运行?是否有可能以一种高效的方式(使用预先大小的缓冲区,但避免OOMs和无限集合)实现drain (一旦)并与任何信道实现一起使用它?
可以改进的是来自信道的额外提示,如预期容量和排队元素的计数。它们可以具有轻松的语义和默认实现,并具有一些合理的可配置上限,类似于对drain扩展的提示。将来可以添加这样的API,制造问题可以随意添加。
https://stackoverflow.com/questions/53778062
复制相似问题