我正在尝试在Python上学习apache beam,但我的示例都不起作用。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions()
p = beam.Pipeline(options=options)
input_filename = "./data/kinglear.txt"
p | beam.io.ReadFromText(input_filename)
result = p.run()结果:
Traceback (most recent call last):
File "C:/Projects/Perm/PythonPOC/Beam/Main.py", line 10, in <module>
result = p.run()
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\pipeline.py", line 416, in run
self._options).run(False)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\pipeline.py", line 429, in run
return self.runner.run_pipeline(self, self._options)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 135, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 389, in run_pipeline
default_environment=self._default_environment))
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 396, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 478, in run_stages
stage_context.safe_coders)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 774, in _run_stage
result, splits = bundle_manager.process_bundle(data_input, data_output)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1811, in process_bundle
part, expected_outputs), part_inputs):
File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 586, in result_iterator
yield fs.pop().result()
File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 432, in result
return self.__get_result()
File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 384, in __get_result
raise self._exception
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 42, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1811, in <lambda>
part, expected_outputs), part_inputs):
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1747, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1173, in push
response = self.worker.do_instruction(request)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 291, in do_instruction
request.instruction_id)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 317, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 675, in process_bundle
data.transform_id].process_encoded(data.data)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 146, in process_encoded
self.output(decoded_value)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\operations.py", line 259, in output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "C:\ProgramData\Anaconda3\lib\site-packages\Cython\Shadow.py", line 165, in cast
return type(*args)
TypeError: Receiver() takes no arguments我无法在任何论坛上运行任何示例,所有这些都会导致相同的错误。不确定需要做些什么来修复它,我是Apache beam的新手。
发布于 2020-02-04 21:45:30
我也遇到过同样的问题,你使用的是什么版本的Python?
我不确定是python版本还是环境有问题,但我通过使用python版本3.5.6创建一个新环境(对于我的情况是使用conda)并重新安装apache-beam解决了这个问题。代码没有问题。
*Edit - Issue似乎来自Cython,两个包之间存在不兼容。卸载Cython显然可以解决这个问题。Apache Beam团队提出的问题- https://issues.apache.org/jira/browse/BEAM-9324
发布于 2020-07-19 01:51:51
Apache光束中仍然存在问题。另一种解决方案是为DirectRunner使用virtualenv。在apache beam documentation上已经有一个简短的教程。创建virtualenv不会自动加载CPython,从而避免了错误。
https://stackoverflow.com/questions/60052098
复制相似问题