我想知道GAE什么时候完成数据流作业。
我试图使以下两个管道
1.
| 'write to bigquery' >> beam.io.WriteToBigQuery(...)
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic')2.
| 'write to bigquery' >> beam.io.WriteToBigQuery(...)
| 'DoPubSub' >> beam.ParDo(DoPubSub()) # do Publish using google.cloud.pubsub但是,上述两种代码都会产生以下错误:
AttributeError:“PDone”对象没有属性“窗口”
WriteToBigquery后如何做手术?
注意:我通过REST使用模板执行数据流。所以,不能使用pipeline_result.wait_until_finish()。
编辑
满叠的在这里。
File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 327, in <module>
vital_data_export()
File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 323, in vital_data_export
result = p.run()
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 382, in run
return self.runner.run_pipeline(self)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 285, in run_pipeline
return_context=True)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 580, in to_runner_api
root_transform_id = context.transforms.get_id(self._root_transform())
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 810, in to_runner_api
for part in self.parts],
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in to_runner_api
for tag, out in self.named_outputs().items()},
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in <dictcomp>
for tag, out in self.named_outputs().items()},
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 144, in to_runner_api
self.windowing))
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 128, in windowing
self.producer.inputs)
File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\transforms\ptransform.py", line 443, in get_windowing
return inputs[0].windowing
AttributeError: 'PDone' object has no attribute 'windowing'发布于 2018-09-07 19:18:40
你不能
很明显,PDone是管道的最后一个阶段,因此申请等待是不必要的。
PInput和PDone是Apache支持的类,分别表示源和接收器。如果您试图在BigQuery编写之后执行某项操作,则除非您串联运行两个不同的数据流作业,否则是不可能的。
如果您正在寻找它们的串联运行,请查看Apache气流。
发布于 2018-10-01 20:29:52
在java中,这就是我在数据流管道的末尾向PubSub发布一个“完成”事件时所做的事情,该管道的输出正在写入BigQuery。希望在Python中也有类似的。
PCollection<TableRow> rows = data.apply("ConvertToTableRow", ParDo.of(new ConvertToRow()));
// Normally this would be the end of the pipeline..
WriteResult writeResult = rows.apply("WriteToBQ", BigQueryIO.writeTableRows().to(...);
// Transformations after this will be done AFTER all rows have been written to BQ
rows.apply(Wait.on(writeResult.getFailedInserts()))
// Transforms each row inserted to an Integer of value 1
.apply("OnePerInsertedRow", ParDo.of(new DoFn<TableRow, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(Integer.valueOf(1));
}
}))
// https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java#L51
// Combines a PCollection of Integers (all 1's) by summing them.
// Outputs a PCollection of one integer element with the sum
.apply("SumInsertedCounts", Sum.integersGlobally())
.apply("CountsMessage", ParDo.of(new DoFn<Integer, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
String messagePayload = "pipeline_completed";
Map<String, String> attributes = new HashMap<>();
attributes.put("rows_written", c.element().toString());
PubsubMessage message = new PubsubMessage(messagePayload.getBytes(), attributes);
c.output(message);
}
}))
.apply("PublishCompletionMessage", PubsubIO.writeMessages().to(/* output topic */));https://stackoverflow.com/questions/51085326
复制相似问题