我目前正在使用apache spark作为工作流系统,处于独立模式。我的上下文是我有一个csv文件,其中每一行都描述了我的模拟的一个实例的参数。
我的输入数据如下所示:
id,socket,type,platform,workload
00001,28001,fcfs,platform.xml,workload1.json
00002,28002,fcfs,platform.xml,workload2.json所以我会用下面的命令调用模拟器
simulator --tcp-port 28001 -arg1 fcfs -arg2 platform.xml -arg3 workload.json// Simple user-defined function to call
// sc.addFile to all file of the local folder
addFolderToContext(sc, input_folder)
// Loading the csv from my file system
val inputsRDD = spark.read.format("csv")
.schema(schema)
.option("header", "true")
.load(input_folder + "/inputs.csv")
// Some preprocessing filtering
val simulationsFCFS = inputsRDD.filter($"type" === "fcfs")
.collect()
.map(r => new String(r.mkString("",",","")))
.take(2) // Only two for testing
sc.parallelize(simulationsFCFS)
.map(r => r)
// Since each line is a simulation,
// I want to have one forked process per line
// .repartition(simulationsFCFS.size) // I try this, but it does not work.
.pipe(Seq(scriptFcfs),
Map(),
null,
null,
false,
1024,
Codec.defaultCharsetCodec.name)
.collect()我已经能够使用RDD.pipe函数调用我的外部程序。但就目前而言,我不得不使用一个包装器脚本来循环/dev/stdin,并为每一行调用我的包装器。这是因为管道函数将来自一个分区的所有行应用于我的程序的相同子进程,而不是每个“行”。
然而,我可以实现良好的并行性,这是我的需求之一(能够并行化我的模拟),我对特定的实例没有太多的控制。我希望的是对我的条目csv的每一行都有一个对外部进程的调用。好处是能够跟踪失败的模拟,因为它们将是一个标准的spark任务。有没有人能帮我实现它?
非常感谢。
发布于 2018-02-27 03:14:40
如果您想继续使用spark.pipe功能,为什么不在需要时为特定的输入行执行一个进程。我假设您不需要为每个RDD记录使用单独的进程,而只需要为某些记录使用单独的进程。如果您确实需要为每个记录使用它,那么只需执行一个mapPartition()并为每个记录启动您自己的可执行文件。另一种笨拙的方法是继续使用RDD.pipe,但重新分区,以便每个分区只有一条记录。
https://stackoverflow.com/questions/48962434
复制相似问题