首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用我们的自定义代码(将json转换为csv)转换为Google cloud数据流?

如何使用我们的自定义代码(将json转换为csv)转换为Google cloud数据流?
EN

Stack Overflow用户
提问于 2019-08-05 19:25:30
回答 2查看 485关注 0票数 0

我有自己的代码来将json转换成csv ..我需要把这段代码放在gcp中,并开发一个数据流作业(它将从云存储读取json并转换成csv,然后再放到云存储中)

代码语言:javascript
复制
p = beam.Pipeline(options=PipelineOptions())
class to_csv(beam.DoFn): 
    def process(self,f):
        columns_list = ["col1",]
        with open(f,"r") as f1:
            dd= ast.literal_eval(f1.readlines()[0])
            for each in dd['data']:
                for ELEMENT in each["values"]:
                    for KEY,VALUE in ELEMENT.items():

                        if KEY=="value" and type(VALUE)==dict:
                            columns_list.extend(VALUE.keys())
        new_col_list = list(set(columns_list))
        sample = pd.DataFrame(columns=new_col_list)

    #Adding values to the table structure

        for each in range(len(dd['data'])):
            empty_dict = {}
            empty_dict["col1"] = dd['data'][each]["id"]
            ["description"]


            for ELEMENT in dd['data'][each]["values"]:
                for KEY,VALUE in ELEMENT.items():
                    if KEY=="value":
                        if type(VALUE)==int:
                            empty_dict["value"]=VALUE
                        elif type(VALUE)==dict:
                            temp_df = pd.DataFrame().from_dict(VALUE,orient="index").T
                            for ind in temp_df.columns:
                                empty_dict[ind] = temp_df[ind][0]       
                    elif KEY=="end_time":
                        end_time_lis = VALUE
            empty_dict["end_time"] = end_time_lis

            sample = sample.append(empty_dict,ignore_index=True)



parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
result =p.run()
#loading the data from the source file
data_from_source = (p | 'ReadMyFile' >> ReadFromText("sourcepath"))
data_from_source | 'Convert To Csv' >> beam.ParDo(to_csv())| 'exportresult'>>WriteToText('outputpath')
EN

回答 2

Stack Overflow用户

发布于 2019-08-05 20:43:39

这可以通过在Google Cloud Function上编写来轻松完成。

Google函数是无服务器的,您只需要为此函数执行的持续时间付费(如果不是免费订阅)。

https://cloud.google.com/functions/

https://cloud.google.com/functions/docs/quickstart-console

票数 0
EN

Stack Overflow用户

发布于 2019-08-06 00:19:09

波束TextIO支持从GCS读取和写入。因此,要从GCS读取数据,请将“ReadFromText(”sourcepath“)”替换为“ReadFromText(”gs://my-bucket/sourcepath“)”。

如果你想要一个单独的文件,那么写起来就有点困难了。您可以将"WriteToText('outputpath')“替换为"WriteToText('gs://my-bucket/outprefix')",它将为每个分片写入一个文件gs://my-bucket/outprefix-0001-of-00262。如果您需要单个文件输出,则Cloud Functions是合并输出文件的一个选项。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57357622

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档