我想编写一个管道来从数据存储中迁移一些数据并将其导出到一个csv中。出于这个原因,我正在考虑做:
我编写了这段代码,但我不确定我的想法是否正确,也不确定最后一步到底需要写什么。相反,有什么直接的方法从Datastore获得csv?
from google.cloud import datastore
from google.cloud.datastore import query as datastore_query
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE
def proto_to_dict(proto_obj):
key_list = proto_obj.DESCRIPTOR.fields_by_name.keys()
d = {}
for key in key_list:
d[key] = getattr(proto_obj, key)
return d
p = beam.Pipeline(options=pipeline_options)
ds_client = datastore.Client(project=project)
query = ds_client.query(kind=kind)
query = datastore_query._pb_from_query(query)
input = p | 'ReadFromDatastore' >> ReadFromDatastore(project=project, query=query)
pipeline = (
input
| 'convert to dict' >> beam.Pardo(proto_to_dict())
| 'write to big query' >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| 'export big query as csv' >> #i need to add the correct code
)
output = pipeline |beam.Map(print)发布于 2022-11-03 18:45:40
如果你想要一个CSV,你应该写到云存储。没有必要使用BigQuery。
使用PTransform来转换您的DataStore检索的数据,考虑使用像csv这样的python库来正确转义引号(任何字符串数据都可能包含逗号,这会扰乱您的CSV文件)。
发布于 2022-11-03 22:53:17
在这种情况下不需要Dataflow。
没有将datastore数据直接导出到CSV文件的功能。
一个可能的解决办法是:
Cloud Storage存储桶gcloud datastore export --kinds="KIND" gs://your_bucketbq命令bq --location=US load \
--source_format=DATASTORE_BACKUP \
mydataset.book_data \
gs://mybucket/20180228T1256/default_namespace/kind_Book/default_namespace_kind_Book.export_metadatabq命令:bq extract \
--compression GZIP \
'mydataset.mytable' \
gs://example-bucket/myfile.csv如果您必须自动化这个过程,可以在shell脚本中自动执行这些步骤。
如果要对数据应用业务规则和转换,则更多地使用Dataflow。对你来说似乎不是这样。
https://stackoverflow.com/questions/74307396
复制相似问题