我使用了一个java库,我可以用它订阅事件库中的事件。
我可以根据以下SubscirptionListener创建订阅
public abstract class SubscriptionListener {
public void onEvent(Subscription subscription, ResolvedEvent event) {
}
public void onError(Subscription subscription, Throwable throwable) {
}
public void onCancelled(Subscription subscription) {
}
}每次触发订阅时,我都想作为流的一部分发出ResolvedEvent。但是,对emit的调用没有结束。
fun flowSubscriptionListener(
streamName: String,
options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
onError: (subscription: Subscription?, throwable: Throwable) -> Unit = { _, _ -> },
onCancelled: (subscription: Subscription) -> Unit = { _ -> }
): Flow<ResolvedEvent> {
return flow {
val listener = object : SubscriptionListener() {
override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
logger.info {
"Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}"
}
runBlocking {
logger.info { "emitting event" }
this@flow.emit(event)
logger.info { "Event emitted" }
}
}
override fun onError(subscription: Subscription?, throwable: Throwable) {
logger.error {
"Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
}
onError(subscription, throwable)
}
override fun onCancelled(subscription: Subscription) {
logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
onCancelled(subscription)
}
}
client.subscribeToStream(streamName, listener).await()
}.buffer(10)
}我有一个示例设置,在这里我等待三个事件的流
flowSubscriptionListener(
streamName = "SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19",
).map {
it.event.eventType
}.collect {
println(it)
}然而,我完全没有收到任何事件。控制台输出显示,对emit的调用永远不会终止。
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - Received event 0@SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - emitting event我期待着"Event emitted"的日志记录
发布于 2022-01-26 13:41:48
为了包装基于回调的API,您应该使用callbackFlow。它支持同时排放,我认为这可能是你的问题。
此外,当流本身被取消时,它将正确地处理订阅的取消(通过awaitClose())。
有一种方法可以做到:
fun EventStoreDBClient.flowSubscription(
streamName: String,
options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
): Flow<ResolvedEvent> = callbackFlow {
val listener = object : SubscriptionListener() {
override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
logger.info { "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}" }
logger.info { "Emitting event" }
trySendBlocking(event)
logger.info { "Event emitted" }
}
override fun onError(subscription: Subscription?, throwable: Throwable) {
logger.error {
"Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
}
close(throwable)
}
override fun onCancelled(subscription: Subscription) {
logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
close()
}
}
val subscription = subscribeToStream(streamName, listener, options).await()
awaitClose {
subscription.stop()
}
}.buffer(10)请注意,我还将其转换为EventStoreDBClient上的扩展函数,这在这里似乎是合适的。我删除了错误/取消回调,因为Flow已经处理了这些回调(如果需要,可以将它们放回去)。
https://stackoverflow.com/questions/70863985
复制相似问题