我正在尝试设置一个数据流作业,用于将json文件转换为csv,并使用下面的python脚本将其写入桶中。(我在3.8.13中尝试了这一点),因为我使用的是apache。我尝试过改变许多版本的python和google云存储。在不使用存储库的情况下,有什么替代方法吗?
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
from smart_open import openclass WriteCSVFile(beam.DoFn):
def __init__(self, bucket_name):
self.bucket_name = bucket_name
def start_bundle(self):
self.client = storage.Client()
def process(self, mylist):
df = pd.DataFrame(mylist, columns={'account_id': str, 'isActive': str, 'balance': str, 'age': str, 'eyeColor': str, 'name': str, 'gender': str, 'company': str, 'email': str, 'phone': str, 'address':str})
bucket = self.client.get_bucket(self.bucket_name)
bucket.blob(f"output_poc4.csv").upload_from_string(df.to_csv(index=False), 'text/csv')下面是错误日志
File "/home/myprject/dataflow_poc.py", line 86, in <module>
run()
File "/home/myprject/dataflow_poc.py", line 79, in run
(pipeline | 'Start' >> beam.Create([None])
File "/home/myprject/.pyenv/versions/dataflow/lib/python3.8/site-
packages/apache_beam/pipeline.py", line 598, in __exit__
self.result.wait_until_finish()
File "/home/myprject/.pyenv/versions/dataflow/lib/python3.8/site-
packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1673, in wait_until_finish
raise DataflowRuntimeException(
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline
failed. State: FAILED, Error:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1458, in
apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 553, in
apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File
"apache_beam/runners/common.py", line 559, in
apache_beam.runners.common.DoFnInvoker
.invoke_start_bundle
File
"/home/myprject/dataflow_poc.py",
line 53, in start_bundle
NameError: name 'storage' is not
defined下面是我冷冻的几个包裹
apache-beam==2.40.0
bcrypt==3.2.2
cachetools==4.2.4
certifi==2022.6.15
cffi==1.15.1
charset-normalizer==2.1.0
cloudpickle==2.1.0
crcmod==1.7
cryptography==37.0.2
dill==0.3.1.1
google-api-core==1.31.6
google-apitools==0.5.31
google-auth==1.35.0
google-auth-httplib2==0.1.0
google-cloud==0.34.0
google-cloud-bigquery==2.34.4
google-cloud-bigquery-storage==2.13.2
google-cloud-bigtable==1.7.2
google-cloud-core==1.7.2
google-cloud-datastore==1.15.5
google-cloud-dlp==3.7.1
google-cloud-language==1.3.2
google-cloud-pubsub==2.13.0
google-cloud-pubsublite==1.4.2
google-cloud-recommendations-ai==0.2.0
google-cloud-spanner==1.19.3
google-cloud-storage==2.4.0
google-cloud-videointelligence==1.16.3
google-cloud-vision==1.0.2
google-crc32c==1.3.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.3发布于 2022-07-06 14:06:37
您需要在使用导入的函数中移动from google.cloud import storage。在您的例子中,它必须在start_bundle函数中。
您可以参考本NameError的处理文档,其中还提到
默认情况下,主会话中定义的全局导入、函数和变量不会在数据流作业序列化期间保存。
发布于 2022-07-06 12:33:39
从google.cloud导入存储添加到start_bundle定义中
https://stackoverflow.com/questions/72881401
复制相似问题