我正在尝试使用Python驱动程序来运行一个迭代的MRjob程序。退出条件取决于计数器。
作业本身似乎正在运行。如果我从命令行运行一次迭代,那么我就可以hadoop fs -cat /user/myname/myhdfsdir/part-00000并查看单次迭代的预期结果。
但是,我需要使用Python驱动程序来运行代码并从runner访问计数器。这是因为它是一种迭代算法,需要计数器的值来确定退出标准。
OUTPUT_PATH = /user/myname/myhdfsdir
!hadoop fs -rm -r {OUTPUT_PATH}
from my_custom_MRjob import my_custom_MRjob
mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt",
"-r", "hadoop",
"--output-dir=hdfs://"+OUTPUT_PATH,
"--no-output"])
while True:
with mr_job.make_runner() as runner:
print runner.get_opts()
runner.run()
with open('localDir/localTextFile.txt', 'w') as f:
for line in runner.stream_output():
key,value = mr_job.parse_output_line(line)
#
f.write(key +'\t'+ value +'\n')
print "End of MRjob iteration. Counters: {}".format(runner.counters())
# read a particular counter
# use counter value to evaluate exit criteria
if exit_criteria_met:
break这会产生以下错误:
IOErrorTraceback (most recent call last)
<ipython-input-136-aded8ecaa727> in <module>()
25 runner.run()
26 with open('localDir/localTextFile.txt', 'w') as f:
---> 27 for line in runner.stream_output():
28 key,value = mr_job.parse_output_line(line)
29 #
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/util.pyc in _to_lines(chunks)
391 leftovers = []
392
--> 393 for chunk in chunks:
394 # special case for b'' standing for EOF
395 if chunk == b'':
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/runner.pyc in cat_output(self)
555 yield b'' # EOF of previous file
556
--> 557 for chunk in self.fs._cat_file(filename):
558 yield chunk
559
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/composite.pyc in _cat_file(self, path)
75
76 def _cat_file(self, path):
---> 77 for line in self._do_action('_cat_file', path):
78 yield line
79
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/hadoop.pyc in _cat_file(self, filename)
272
273 if returncode != 0:
--> 274 raise IOError("Could not stream %s" % filename)
275
276 def mkdir(self, path):
IOError: Could not stream hdfs://hdfs:/user/myname/myhdfsdir/part-00000尤其令人困惑和沮丧的是:hdfs://hdfs:/user/myname/myhdfsdir/part-00000。注意,URL中有两个hdfs方案,但在第二个hdfs实例中只有一个正斜杠。我已经尝试在mrjob args:"--output-dir=hdfs://"+OUTPUT_PATH中添加和删除文本hdfs://。在这两种情况下,我都得到了相同的错误签名。
如果我在“本地”模式而不是Hadoop模式下运行驱动程序,就不会有任何问题,只有一个明显而关键的例外,那就是我无法访问Hadoop引擎。这可以很好地工作:
mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt"])我需要读取初始输入文件,总是从本地文件系统读取(即使在Hadoop模式下也是如此)。然后运行MRjob迭代,其输出覆盖本地文件系统输入文件。然后从runner访问计数器并评估退出条件。如果不满足退出条件,则使用来自本地文件系统的输入再次运行作业,这一次使用上次运行时更新的本地输入文件。
发布于 2020-07-31 17:08:36
只要你有一个包含hdfs:/的路径,你就不会成功,因为它永远不会是有效的。
您在评论中提到您尝试手动添加hdfs://,这可能是一个很好的技巧,但在您的代码中,我没有看到您“清理”了错误的hdfs:/。因此,即使您添加了正确的前缀,下一行内容也将是错误的,并且代码仍然没有成功的机会。
所以,请把它清理干净。
实用提示:这个问题是一段时间以前的问题,如果软件本身有问题,现在可能已经解决了。如果问题仍然存在,很可能是您尝试使用的代码中有一些奇怪的东西。也许可以从一个可靠来源的微不足道的例子开始排除这种可能性。
https://stackoverflow.com/questions/49472471
复制相似问题