首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala流和ExecutionContext问题

Scala流和ExecutionContext问题
EN

Stack Overflow用户
提问于 2020-01-07 15:24:22
回答 1查看 93关注 0票数 0

我是Scala的新手,在我的任务中遇到了一些问题:我想构建一个流类,它可以完成三个主要任务:过滤器、地图和forEach。我的流s data is an array of elements. Each of the 3 main tasks should run in 2 different threads on my stream的数组。此外,我需要将动作的逻辑和实际运行划分为两个不同的部分。首先,声明流中的所有任务,只有在运行stream.run()时,才希望实际操作发生。

我的代码:

代码语言:javascript
复制
class LearningStream[A]() {
  val es: ExecutorService = Executors.newFixedThreadPool(2)
  val ec = ExecutionContext.fromExecutorService(es)
  var streamValues: ArrayBuffer[A] = ArrayBuffer[A]()
  var r: Runnable = () => "";

  def setValues(streamv: ArrayBuffer[A]) = {
    streamValues = streamv;
  }

  def filter(p: A => Boolean): LearningStream[A] = {
    var ls_filtered: LearningStream[A] = new LearningStream[A]()
    r = () => {
      println("running real filter..")
      val (l,r) = streamValues.splitAt(streamValues.length/2)
      val a:ArrayBuffer[A]=es.submit(()=>l.filter(p)).get()
      val b:ArrayBuffer[A]=es.submit(()=>r.filter(p)).get()
      ms_filtered.setValues(a++b)
    }
    return ls_filtered
  }

  def map[B](f: A => B): LearningStream[B] = {
    var ls_map: LearningStream[B] = new LearningStream[B]()
    r = () => {
      println("running real map..")
      val (l,r) = streamValues.splitAt(streamValues.length/2)
      val a:ArrayBuffer[B]=es.submit(()=>l.map(f)).get()
      val b:ArrayBuffer[B]=es.submit(()=>r.map(f)).get()
      ls_map.setValues(a++b)
    }
    return ls_map
  }

  def forEach(c: A => Unit): Unit = {
    r=()=>{
      println("running real forEach")
      streamValues.foreach(c)}
  }

   def insert(a: A): Unit = {
    streamValues += a
  }

  def start(): Unit = {
    ec.submit(r)
  }

   def shutdown(): Unit = {
    ec.shutdown()
  }
}

我的主语是:

代码语言:javascript
复制
def main(args: Array[String]): Unit = {
    var factorial=0
    val s = new LearningStream[String]
    s.filter(str=>str.startsWith("-")).map(s=>s.toInt*(-1)).forEach(i=>factorial=factorial*i)

    for(i <- -5 to 5){
      s.insert(i.toString)
    }
    println(s.streamValues)
    s.start()
    println(factorial)
    }

主打印只输出滤波器的输出,析因不变(仍为1)。我在这里错过了什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-01-10 17:24:23

我的解决方案:@Levi在评论中留下了一些很好的提示,如果您想得到提示而不是真正的解决方案。

第一个问题:只有一个命令(筛选器)运行,另一个没有运行。解决方案:通过以下方法向每个命令的可运行性中插入下一个流的调用:

代码语言:javascript
复制
ec.submit(ms_map.r)

为了能够关闭所有会话,我们需要向类中添加另一个LearningStream数据成员。但是,我们不能仅仅添加一个常规的LearningStream对象,因为它依赖于参数A。因此,我实现了一个具有关闭函数的特性,并且我的数据成员是该特征类型。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59631317

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档