首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用emit构建Kotlin流会无限期地运行,并且不会完成

使用emit构建Kotlin流会无限期地运行,并且不会完成
EN

Stack Overflow用户
提问于 2022-01-26 13:12:54
回答 1查看 337关注 0票数 1

我使用了一个java库,我可以用它订阅事件库中的事件。

我可以根据以下SubscirptionListener创建订阅

代码语言:javascript
复制
public abstract class SubscriptionListener {
    public void onEvent(Subscription subscription, ResolvedEvent event) {
    }

    public void onError(Subscription subscription, Throwable throwable) {
    }

    public void onCancelled(Subscription subscription) {
    }
}

每次触发订阅时,我都想作为流的一部分发出ResolvedEvent。但是,对emit的调用没有结束。

代码语言:javascript
复制
    fun flowSubscriptionListener(
        streamName: String,
        options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
        onError: (subscription: Subscription?, throwable: Throwable) -> Unit = { _, _ -> },
        onCancelled: (subscription: Subscription) -> Unit = { _ -> }
    ): Flow<ResolvedEvent> {
        return flow {
            val listener = object : SubscriptionListener() {
                override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
                    logger.info {
                        "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}"
                    }

                    runBlocking {
                        logger.info { "emitting event" }
                        this@flow.emit(event)
                        logger.info { "Event emitted" }
                    }
                }

                override fun onError(subscription: Subscription?, throwable: Throwable) {
                    logger.error {
                        "Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
                    }
                    onError(subscription, throwable)
                }

                override fun onCancelled(subscription: Subscription) {
                    logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
                    onCancelled(subscription)
                }
            }
            client.subscribeToStream(streamName, listener).await()
        }.buffer(10)
    }

我有一个示例设置,在这里我等待三个事件的流

代码语言:javascript
复制
flowSubscriptionListener(
            streamName = "SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19",
        ).map {
            it.event.eventType
        }.collect {
            println(it)
        }

然而,我完全没有收到任何事件。控制台输出显示,对emit的调用永远不会终止。

代码语言:javascript
复制
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - Received event 0@SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - emitting event

我期待着"Event emitted"的日志记录

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-01-26 13:41:48

为了包装基于回调的API,您应该使用callbackFlow。它支持同时排放,我认为这可能是你的问题。

此外,当流本身被取消时,它将正确地处理订阅的取消(通过awaitClose())。

有一种方法可以做到:

代码语言:javascript
复制
fun EventStoreDBClient.flowSubscription(
    streamName: String,
    options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
): Flow<ResolvedEvent> = callbackFlow {
    val listener = object : SubscriptionListener() {
        override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
            logger.info { "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}" }
            logger.info { "Emitting event" }
            trySendBlocking(event)
            logger.info { "Event emitted" }
        }

        override fun onError(subscription: Subscription?, throwable: Throwable) {
            logger.error {
                "Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
            }
            close(throwable)
        }

        override fun onCancelled(subscription: Subscription) {
            logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
            close()
        }
    }
    val subscription = subscribeToStream(streamName, listener, options).await()
    awaitClose {
        subscription.stop()
    }
}.buffer(10)

请注意,我还将其转换为EventStoreDBClient上的扩展函数,这在这里似乎是合适的。我删除了错误/取消回调,因为Flow已经处理了这些回调(如果需要,可以将它们放回去)。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70863985

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档