我有自己的代码来将json转换成csv ..我需要把这段代码放在gcp中,并开发一个数据流作业(它将从云存储读取json并转换成csv,然后再放到云存储中)
p = beam.Pipeline(options=PipelineOptions())
class to_csv(beam.DoFn):
def process(self,f):
columns_list = ["col1",]
with open(f,"r") as f1:
dd= ast.literal_eval(f1.readlines()[0])
for each in dd['data']:
for ELEMENT in each["values"]:
for KEY,VALUE in ELEMENT.items():
if KEY=="value" and type(VALUE)==dict:
columns_list.extend(VALUE.keys())
new_col_list = list(set(columns_list))
sample = pd.DataFrame(columns=new_col_list)
#Adding values to the table structure
for each in range(len(dd['data'])):
empty_dict = {}
empty_dict["col1"] = dd['data'][each]["id"]
["description"]
for ELEMENT in dd['data'][each]["values"]:
for KEY,VALUE in ELEMENT.items():
if KEY=="value":
if type(VALUE)==int:
empty_dict["value"]=VALUE
elif type(VALUE)==dict:
temp_df = pd.DataFrame().from_dict(VALUE,orient="index").T
for ind in temp_df.columns:
empty_dict[ind] = temp_df[ind][0]
elif KEY=="end_time":
end_time_lis = VALUE
empty_dict["end_time"] = end_time_lis
sample = sample.append(empty_dict,ignore_index=True)
parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
result =p.run()
#loading the data from the source file
data_from_source = (p | 'ReadMyFile' >> ReadFromText("sourcepath"))
data_from_source | 'Convert To Csv' >> beam.ParDo(to_csv())| 'exportresult'>>WriteToText('outputpath')发布于 2019-08-05 20:43:39
这可以通过在Google Cloud Function上编写来轻松完成。
Google函数是无服务器的,您只需要为此函数执行的持续时间付费(如果不是免费订阅)。
发布于 2019-08-06 00:19:09
波束TextIO支持从GCS读取和写入。因此,要从GCS读取数据,请将“ReadFromText(”sourcepath“)”替换为“ReadFromText(”gs://my-bucket/sourcepath“)”。
如果你想要一个单独的文件,那么写起来就有点困难了。您可以将"WriteToText('outputpath')“替换为"WriteToText('gs://my-bucket/outprefix')",它将为每个分片写入一个文件gs://my-bucket/outprefix-0001-of-00262。如果您需要单个文件输出,则Cloud Functions是合并输出文件的一个选项。
https://stackoverflow.com/questions/57357622
复制相似问题