我正在使用apache beam向gcs编写一个json。但是遇到了以下错误
NotImplementedError: offset: 0, whence: 0, position: 50547, last: 50547 [while running 'Writing new data to gcs/write data gcs/Write/WriteImpl/WriteBundles/WriteBundles']
不知道为什么会发生此错误。相同的代码如下:
class WriteDataGCS(beam.PTransform):
"""
To write data to GCS
"""
def __init__(self, bucket):
"""
Initiate the bucket as a class field
:type bucket:string
:param bucket: query to be run for data
"""
self.bucket = bucket
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
(pcoll | "print intermediate" >> beam.Map(print_row))
return (pcoll | "write data gcs" >> beam.io.WriteToText(self.bucket, coder=JsonCoder(), file_name_suffix=".json"))class JsonCoder:
"""
This class represents dump and load operations performed on json
"""
def encode(self,data):
"""
Encodes the json data.
:type data: string
:param data: Data to be encoded
"""
# logger.info("JSON DATA for encoding - {}".format(data))
return json.dumps(data,default=str)
def decode(self,data):
"""
Decodes the json data.
:type data: string
:param data: Data to be decoded
"""
# logger.info("JSON DATA for decoding - {}".format(data))
return json.loads(data)发布于 2019-09-14 05:54:40
WriteToText的coder参数需要一个apache_beam.coders.Coder实例。您可以尝试让JsonCoder继承Coder基类,但我认为也可以使用Map将数据转换为字符串
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
return (pcoll
| "print intermediate" >> beam.Map(print_row))
| "to_json" >> beam.Map(lambda x: json.dumps(x, default=str)))
| "write data gcs" >> beam.io.WriteToText(self.bucket, file_name_suffix=".json"))https://stackoverflow.com/questions/57881282
复制相似问题