我使用akka-grpc来生成客户端绑定。它们的形式通常是
func[A, B](in: Source[A]) : Source[B],
即,他们消费Source[A]并提供Source[B]。
现在,我想把func变成一个Flow[A, B],以便在akka-stream中使用它们。
发布于 2019-03-22 19:34:07
解决方案是:
def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] =
Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }它使用prefixAndTail劫持底层的Source。
https://stackoverflow.com/questions/55282938
复制相似问题