我需要等待SourceQueueWithComplete.offer完成:
val config = ConfigFactory.defaultApplication()
implicit val system: ActorSystem = ActorSystem("TestSystem", config)
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ctx = system.dispatcher
val queue =
Source.queue(5, OverflowStrategy.backpressure)
.async
.mapAsync(8) { x: Int =>
Future {
println(s"Processed $x")
x
}
}
.to(Sink.ignore)
.run()
for (i <- 1 to 10) {
println(s"Queueing $i...")
for (_ <- queue.offer(i))
println(s"$i has been queued!")
}输出:
Queueing 1...
Exception in thread "main" java.lang.IllegalMonitorStateException
Queueing 2...
at java.lang.Object.wait(Native Method)
Queueing 3...
at java.lang.Object.wait(Object.java:502)
Queueing 4...
at Node1$.delayedEndpoint$Node1$1(Node1.scala:95)
Queueing 5...
at Node1$delayedInit$body.apply(Node1.scala:12)
Queueing 6...
at scala.Function0.apply$mcV$sp(Function0.scala:34)
Queueing 7...
Queueing 8...
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
Queueing 9...
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
Queueing 10...
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:389)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at Node1$.main(Node1.scala:12)
at Node1.main(Node1.scala)
1 has been queued!
2 has been queued!
4 has been queued!
5 has been queued!
3 has been queued!
6 has been queued!
Processed 1
Processed 2
Processed 3
Processed 4
Processed 5
Processed 6将queue.offer包含在synchronized块中是没有帮助的。
发布于 2018-01-24 03:03:33
这些错误暗示了一些与DelayedInit相关的东西,从Scala2.11.0开始就不再推荐使用DelayedInit(来自Scaladoc:“Scaladoc语义可能令人惊讶”)。假设您正在使用App,它扩展了DelayedInit,但是如果没有看到更多的代码,就很难诊断出这是一个问题。
此外,您还可以使用另一个Source向队列提供元素。以下是工作原理:
import akka.stream._
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import scala.concurrent._
object Main extends App {
implicit val system = ActorSystem("TestApp")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val queue =
Source.queue(5, OverflowStrategy.backpressure)
.async
.mapAsync(8) { x: Int => Future(x) }
.to(Sink.foreach(y => println(s"Processed: $y")))
.run()
Source(1 to 10)
.mapAsync(1) { x =>
println(s"Offering: $x")
queue.offer(x)
}
.runWith(Sink.ignore)
}https://stackoverflow.com/questions/48408183
复制相似问题