首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RDD管道每行一个外部进程

RDD管道每行一个外部进程
EN

Stack Overflow用户
提问于 2018-02-24 19:40:29
回答 1查看 299关注 0票数 1

我目前正在使用apache spark作为工作流系统,处于独立模式。我的上下文是我有一个csv文件,其中每一行都描述了我的模拟的一个实例的参数。

我的输入数据如下所示:

代码语言:javascript
复制
id,socket,type,platform,workload
00001,28001,fcfs,platform.xml,workload1.json
00002,28002,fcfs,platform.xml,workload2.json

所以我会用下面的命令调用模拟器

代码语言:javascript
复制
simulator --tcp-port 28001 -arg1 fcfs -arg2 platform.xml -arg3 workload.json
代码语言:javascript
复制
// Simple user-defined function to call
// sc.addFile to all file of the local folder
addFolderToContext(sc, input_folder) 
// Loading the csv from my file system
val inputsRDD = spark.read.format("csv")
   .schema(schema) 
   .option("header", "true")
   .load(input_folder + "/inputs.csv")

// Some preprocessing filtering
val simulationsFCFS = inputsRDD.filter($"type" === "fcfs")
   .collect()
   .map(r => new String(r.mkString("",",","")))
   .take(2) // Only two for testing

sc.parallelize(simulationsFCFS)
    .map(r => r)
    // Since each line is a simulation, 
    // I want to have one forked process per line 
    // .repartition(simulationsFCFS.size) // I try this, but it does not work.
    .pipe(Seq(scriptFcfs),
     Map(),
     null,
     null,
     false,
     1024, 
     Codec.defaultCharsetCodec.name) 
    .collect()

我已经能够使用RDD.pipe函数调用我的外部程序。但就目前而言,我不得不使用一个包装器脚本来循环/dev/stdin,并为每一行调用我的包装器。这是因为管道函数将来自一个分区的所有行应用于我的程序的相同子进程,而不是每个“行”。

然而,我可以实现良好的并行性,这是我的需求之一(能够并行化我的模拟),我对特定的实例没有太多的控制。我希望的是对我的条目csv的每一行都有一个对外部进程的调用。好处是能够跟踪失败的模拟,因为它们将是一个标准的spark任务。有没有人能帮我实现它?

非常感谢。

EN

回答 1

Stack Overflow用户

发布于 2018-02-27 03:14:40

如果您想继续使用spark.pipe功能,为什么不在需要时为特定的输入行执行一个进程。我假设您不需要为每个RDD记录使用单独的进程,而只需要为某些记录使用单独的进程。如果您确实需要为每个记录使用它,那么只需执行一个mapPartition()并为每个记录启动您自己的可执行文件。另一种笨拙的方法是继续使用RDD.pipe,但重新分区,以便每个分区只有一条记录。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48962434

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档