我正在尝试使用下面的代码读取Bigquery中的一些数据和文件系统中的一些数据。
( beam.io.Read(beam.io.BigQuerySource(query=apn_query,()( beam.combiners.ToList() beam.combiners.ToList() preprocess_rows =p= beam.io.ReadFromText(file_path,coder=UnicodeCoder())
但是,当我运行这个管道时,我会出现以下错误
文件"/etl/dataflow/etlTXLPreprocessor.py",第125行,在run()
“/etl/dataflow/etlTXLPreprocessor.py”中,第120行,在run p.run().wait_until_finish() .wait_until_finish()文件第461行,在run self._options中,在.run(False).run文件第474行中,在运行返回self.runner.run_pipeline(self,self._options)文件第182行中,在run_pipeline返回runner.run_pipeline(管道)中,(选项)文件"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",第413行,在pipeline.replace_all(_get_transform_overrides(options)) "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",第443行中,在"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",self._replace(覆盖)文件第340行中,在_replace self.visit(TransformUpdater(自我))文件第503行中,在访问self._root_transform().visit(访问者,self )中,(访问)文件"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",行939,在访问part.visit(访问者,管道,访问)文件"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",行939,在访问part.visit(访问者,管道,访问)文件"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",行939,在访问"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",(访问者,管道,已访问)文件part.visit行942,在访问visitor.visit_transform(self)文件"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",行338,在"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",visit_transform self._replace_if_needed(transform_node)文件行301中,在_replace_if_needed new_output = replacement_transform.expand(input_node) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py",行87中,在展开invoker =DoFnInvoker.create_invoker(签名,process_invocation=False)文件“apache_beam/ line /Common.py”中,在TypeError: create_invoker()中,第360行至少接受两个位置参数(1给定)
但是如果我像这样运行我的代码
(()( beam.io.Read(beam.io.BigQuerySource(query=apn_query,() beam.combiners.ToList() beam.combiners.ToList() apn1 =p beam.combiners.ToList use_standard_sql=True))
像这样的或
preprocess_rows =p 000-beam.io.ReadFromText(file_path,coder=UnicodeCoder()) preprocess_rows1 =p= beam.io.ReadFromText(file_path,coder=UnicodeCoder())
我找不出这个错误。在Apache管道中从同一个数据源读取是一个限制吗?
发布于 2020-03-01 15:31:20
在执行相同类型的操作时,我会得到相同的错误,从BigQuery和文件系统中提取数据。
lines = p | "Read Input Parameters" >> ReadFromText(options.input)
past_posts = p | "Get Past Posts From BigQuery" >> Read(BigQuerySource(query=f"SELECT url FROM {full_bq_table_id}", use_standard_sql=False))错误:
"/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py",跟踪(最近一次调用):文件
第193行,在_run_module_as_main "main“中,在mod_spec中)文件"/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py",行85,在full_bq_table_id=f"apartment-data-project:{dataset}.craigslist_posts“文件”/User/ianmitchell/ _run_code /Personal Projects/craigslist/craigslist_管线. 14“第14行中,_run_code exec(代码、run_globals)文件”/User/ianmitchell/_run_code/Projects/craigslist_管线.14“中,第14行在运行结果= Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",()p.run文件“/p.run/ianmitchell/Documents/Personal p.run第461行中,在run self._options).run(False) File”/User/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",第474行中,在run返回self.runner.run_pipeline(self )中文件“/User/ianmitchell/self._options/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",第182行,在run_pipeline返回runner.run_pipeline(管道,选项)文件”/User/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",第413行,在pipeline.replace_all(_get_transform_overrides(options))文件“/ run_pipeline /ianmitchell/run_pipeline/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",第443行中,在replace_all self._replace(重写) File”/User/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",第340行中,在“用户(自我)文件”/用户/ianmitchell/文档/个人Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",第503行中,在“访问self._root_transform().visit(访问者,自访)文件”/用户/ianmitchell/文档/个人Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",第939行中,访问part.visit(访问者,管道,访问)文件“/用户/ianmitchell/ Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",第939行,访问part.visit(访问者,管道,已访问)文件”/用户/ianmitchell/文档/个人Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py“,行939,访问part.visit(访问者,管道,"/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",第942行,在访问visitor.visit_transform(self)文件“/User/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py",第338行,在visit_transform self._replace_if_needed(transform_node) File“/User/ianmitchell/Documents/Personal self._replace_if_needed line 301中,在_replace_if_needed new_output = replacement_transform.expand(input_node) File”/User/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py",第87行中,在展开invoker =DoFnInvoker.create_invoker(签名,process_invocation=False)文件“apache_beam/ line /Common.py”中,在TypeError: create_invoker()中,第360行至少接受两个位置参数(1给定)
想知道为什么你不能从不同的来源以及。
发布于 2020-04-15 13:40:47
这是ApacheBeamv2.19中直接运行程序中的一个bug。修复已经完成,但还没有发布。将apache-beam降级为2.16 (pip安装apache==2.16),它将工作。
https://stackoverflow.com/questions/60474002
复制相似问题