在我的管道中,我使用WriteToBigQuery,如下所示:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)这将返回一个Dict,如文档中所述,如下所示:
beam.io.WriteToBigQuery PTransform返回一个字典,其BigQueryWriteFn.FAILED_ROWS条目包含所有未写入的行的PCollection。
如何打印这个数据集并将其转换为pcollection,或者如何打印FAILED_ROWS?
如果我做了:| "print" >> beam.Map(print)
然后我得到:AttributeError: 'dict' object has no attribute 'pipeline'
我一定读过一百条管道,但在WriteToBigQuery之后我从来没有看到过任何东西。
当我完成管道并将结果存储在变量中时,编辑,我有以下内容:
{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}但是,我不知道如何在这样的管道中使用这个结果:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)发布于 2019-11-29 23:31:47
处理无效输入的死信是一种常见的Beam/Dataflow用法,可以同时使用Java和Python,但后者的示例不多。
假设我们有一些虚拟输入数据,其中有10行好的行和一个不符合表模式的坏行:
schema = "index:INTEGER,event:STRING"
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')然后,我将写结果命名为(本例中为events):
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)然后访问FAILED_ROWS端输出:
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))这在DirectRunner中很好地工作,并将好的行写到BigQuery:

和一个本地文件的坏消息:
$ cat error_log.txt-00000-of-00001
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})如果您使用DataflowRunner运行它,您将需要一些额外的标志。如果遇到TypeError: 'PDone' object has no attribute '__getitem__'错误,则需要添加--experiments=use_beam_bq_sink才能使用新的BigQuery接收器。
如果您得到一个KeyError: 'FailedRows',这是因为新的接收器将默认设置来为批处理管道加载BigQuery作业:
STREAMING_INSERTS、FILE_LOADS或默认值。介绍如何将数据加载到BigQuery:https://cloud.google.com/bigquery/docs/loading-data中。DEFAULT将在流管道上使用STREAMING_INSERTS,在批处理管道上使用FILE_LOADS。
可以通过在method='STREAMING_INSERTS'中指定WriteToBigQuery来重写行为。

DirectRunner和DataflowRunner 这里的完整代码。
https://stackoverflow.com/questions/59102519
复制相似问题