首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >NotImplementedError apache beam python

NotImplementedError apache beam python
EN

Stack Overflow用户
提问于 2019-09-11 10:34:02
回答 1查看 384关注 0票数 0

我正在使用apache beam向gcs编写一个json。但是遇到了以下错误

NotImplementedError: offset: 0, whence: 0, position: 50547, last: 50547 [while running 'Writing new data to gcs/write data gcs/Write/WriteImpl/WriteBundles/WriteBundles']

不知道为什么会发生此错误。相同的代码如下:

代码语言:javascript
复制
class WriteDataGCS(beam.PTransform):
        """
        To write data to GCS
        """

        def __init__(self, bucket):
            """
            Initiate the bucket as a class field

            :type bucket:string
            :param bucket: query to be run for data
            """
            self.bucket = bucket

        def expand(self, pcoll):
            """
            PTransform Method run when called on Class Name

            :type pcoll: PCollection
            :param pcoll: A pcollection
            """
            (pcoll | "print intermediate" >> beam.Map(print_row))
            return (pcoll | "write data gcs" >> beam.io.WriteToText(self.bucket, coder=JsonCoder(), file_name_suffix=".json"))
代码语言:javascript
复制
class JsonCoder:
    """
    This class represents dump and load operations performed on json
    """
    def encode(self,data):
        """
        Encodes the json data.

        :type data: string
        :param data: Data to be encoded
        """
        # logger.info("JSON DATA for encoding - {}".format(data))
        return json.dumps(data,default=str)

    def decode(self,data):
        """
        Decodes the json data.

        :type data: string
        :param data: Data to be decoded
        """
        # logger.info("JSON DATA for decoding - {}".format(data))
        return json.loads(data)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-14 05:54:40

WriteToTextcoder参数需要一个apache_beam.coders.Coder实例。您可以尝试让JsonCoder继承Coder基类,但我认为也可以使用Map将数据转换为字符串

代码语言:javascript
复制
        def expand(self, pcoll):
            """
            PTransform Method run when called on Class Name

            :type pcoll: PCollection
            :param pcoll: A pcollection
            """
            return (pcoll
              | "print intermediate" >> beam.Map(print_row))
              | "to_json" >> beam.Map(lambda x: json.dumps(x, default=str)))
              | "write data gcs" >> beam.io.WriteToText(self.bucket, file_name_suffix=".json"))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57881282

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档