我希望合并两个akka流源,并保留第一个源的ActorRef,以供物化后实际使用。
val buffer = 100
val apiSource: Source[Data, ActorRef] = Source.actorRef[Data](buffer, OverflowStrategy.backpressure)
.delay(2.second, DelayOverflowStrategy.backpressure)
val kafkaSource: Source[Data, Consumer.Control] = createConsumer(config.kafkaConsumerConfig, "test")
val combinedSource: Source[Data, NotUsed] = Source.combine(kafkaSource, apiSource)(Merge(_))问题是combined方法忽略了物化类型,我想知道是否还有其他方法来实现这一点。
发布于 2017-10-16 08:41:58
您可以使用Source#mergeMat
val combinedSource: Source[Data, ActorRef] = kafkaSource.mergeMat(apiSource)(Keep.right)发布于 2017-10-16 08:42:57
这似乎起作用了
def combineAndRetainFirst[T,M1, M2](first: Source[T, M1], second: Source[T, M2]): Source[T, M1] ={
Source.fromGraph(
GraphDSL.create(first, second)((m1, _) => m1){ implicit builder => (g1, g2) =>
import GraphDSL.Implicits._
val merge = builder.add(Merge[T](2))
g1 ~> merge.in(0)
g2 ~> merge.in(1)
SourceShape(merge.out)
}
)
}https://stackoverflow.com/questions/46765118
复制相似问题