首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星星之火:并行运行外部进程

星星之火:并行运行外部进程
EN

Stack Overflow用户
提问于 2015-12-04 16:07:12
回答 2查看 2.8K关注 0票数 5

Spark是否有可能“包装”并运行管理其输入和输出的外部流程?

进程由通常从命令行运行的普通C/C++应用程序表示。它接受一个纯文本文件作为输入,并生成另一个纯文本文件作为输出。由于我需要将这个应用程序的流程与更大的东西(总是在Spark中)集成起来,所以我想知道是否有办法做到这一点。

进程可以很容易地并行运行(目前我使用GNU并行),只需将其输入分成(例如)10个部件文件,在内存中运行10个实例,并在一个文件中重新加入最后的10个部分文件输出。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-12-04 17:50:43

您可以做的最简单的事情是编写一个简单的包装器,它从标准输入获取数据,写入文件,执行外部程序,并将结果输出到标准输出。之后,您所要做的就是使用pipe方法:

代码语言:javascript
复制
rdd.pipe("your_wrapper")

唯一严肃的考虑是IO性能。如果可能的话,最好调整你想要调用的程序,这样它就可以直接读写数据而不需要经过磁盘。

另外,您可以使用mapPartitionsprocess和标准IO工具相结合,写入本地文件,调用程序并读取输出。

票数 5
EN

Stack Overflow用户

发布于 2020-02-07 17:37:02

如果你以谷歌搜索中的问题标题结束,但你没有外部程序需要从文件中读取的操作限制--也就是说,如果你的外部程序可以从stdin读取--这是一个解决方案。对于我的用例,我需要为每个输入文件调用一个外部解密程序。

代码语言:javascript
复制
import org.apache.commons.io.IOUtils
import sys.process._
import scala.collection.mutable.ArrayBuffer

val showSampleRows = true
val bfRdd = sc.binaryFiles("/some/files/*,/more/files/*")
val rdd   = bfRdd.flatMap{ case(file, pds) => {  // pds is a PortableDataStream
    val rows   = new ArrayBuffer[Array[String]]()
    var errors = List[String]()
    val io     = new ProcessIO (
        in  => {  // "in" is an OutputStream; write the encrypted contents of the 
                  // input file (pds) to this stream
            IOUtils.copy(pds.open(), in)  // open() returns a DataInputStream
            in.close
        },
        out => {  // "out" is an InputStream; read the decrypted data off this stream.
            // Even though this runs in another thread, we can write to rows, since it
            // is part of the closure for this function
            for(line <- scala.io.Source.fromInputStream(out).getLines) {
                // ...decode line here... for my data, it was pipe-delimited
                rows += line.split('|')
            }
            out.close
        },
        err => {  // "err" is an InputStream; read any errors off this stream
            // errors is part of the closure for this function
            errors = scala.io.Source.fromInputStream(err).getLines.toList
            err.close
        }
    )
    val cmd       = List("/my/decryption/program", "--decrypt")
    val exitValue = cmd.run(io).exitValue  // blocks until subprocess finishes
    println(s"-- Results for file $file:")
    if (exitValue != 0) {  
        // TBD write to string accumulator instead, so driver can output errors
        // string accumulator from @zero323: https://stackoverflow.com/a/31496694/215945
        println(s"exit code: $exitValue")
        errors.foreach(println)
    } else {
        // TBD, you'll probably want to move this code to the driver, otherwise
        // unless you're using the shell, you won't see this output
        // because it will be sent to stdout of the executor
        println(s"row count: ${rows.size}")
        if (showSampleRows) {
            println("6 sample rows:")
            rows.slice(0,6).foreach(row => println("  " + row.mkString("|")))
        }
    }
    rows
}}
代码语言:javascript
复制
scala> :paste "test.scala"
Loading test.scala...
...
rdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[62] at flatMap at <console>:294

scala> rdd.count  // action, causes Spark code to actually run
-- Results for file hdfs://path/to/encrypted/file1:  // this file had errors
exit code: 255
ERROR: Error decrypting
my_decryption_program: Bad header data[0]
-- Results for file hdfs://path/to/encrypted/file2:
row count: 416638
sample rows:
  <...first row shown here ...>
  ...
  <...sixth row shown here ...>
...
res43: Long = 843039

参考文献:

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34092517

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档