我有一个dir,几乎100个日志文件,每个重在10~15 GB。其要求是逐行读取每个文件(订单根本不重要),清理json行并将其转储到后端elasticsearch存储区进行索引。
这是我的工人做这份工作
# file = worker.php
echo " -- New PHP Worker Started -- "; // to get how many times gnu-parallel initiated the worker
$dataSet = [];
while (false !== ($line = fgets(STDIN))) {
// convert line text to json
$l = json_decode($line);
$dataSet[] = $l;
if(sizeof($dataSet) >= 1000) {
//index json to elasticsearch
$elasticsearch->bulkIndex($dataSet);
$dataSet = [];
}
}有了答案here和here的帮助,我马上就到了,它正在工作(某种程度上),但只需要确保它实际上是在做我假设它正在做的事情。
只要有一个文件,我就可以按以下方式处理
parallel --pipepart -a 10GB_input_file.txt --round-robin php worker.php 效果很好。添加-循环确保php工作进程只启动一次,然后它就会继续以管道的形式接收数据(可怜人的队列)。
因此,对于4CPU机器,它会启动4个php工作人员,并非常快速地处理所有数据。
要对所有文件做同样的操作,下面是我对它的看法
find /data/directory -maxdepth 1 -type f | parallel cat | parallel --pipe -N10000 --round-robin php worker.php 这看起来有点像工作,但我有一种直觉,这是一个错误的方式嵌套并行所有的文件。
其次,由于它不能使用--管道部件,我认为它会慢一些。
第三,一旦工作完成,我看到在4 4cpu机器上,只有4名工人开始工作,并完成了工作。行为正确吗?它不应该为每个文件启动4名员工吗?只是想确保我没有漏掉任何数据。
你知道怎么才能用更好的方式做这件事吗?
发布于 2018-10-25 07:46:10
如果它们大小大致相同,那么为什么不简单地给每个文件一个文件:
find /data/directory -maxdepth 1 -type f |
parallel php worker.php '<' {}另一种方法是对其中每一个使用--pipepart:
do_one() {
parallel --pipepart -a "$1" --block -1 php worker.php
}
export -f do_one
find /data/directory -maxdepth 1 -type f | parallel -j1 do_one如果启动php worker.php不需要很长时间,那么最后一个可能更好,因为如果文件大小非常不同,那么它的分布将更加均匀,因此,如果最后一个文件很大,则不会等待单个进程完成该文件的处理。
https://stackoverflow.com/questions/52981773
复制相似问题