我有一个管道,它产生一个数据流图(序列化的JSON表示),它超过了API允许的限制,因此不能像通常那样通过apache beam的dataflow runner启动。并且使用指示的参数--experiments=upload_graph运行dataflow runner不起作用,并且失败,说明没有指定步骤。
通过错误获得有关此大小问题的通知时,将提供以下信息:
the size of the serialized JSON representation of the pipeline exceeds the allowable limit for the API.
Use experiment 'upload_graph' (--experiments=upload_graph)
to direct the runner to upload the JSON to your
GCS staging bucket instead of embedding in the API request.现在使用此参数,确实会导致dataflow runner将额外的dataflow_graph.pb文件上载到通常的pipeline.pb文件旁边的登台位置。我证实它确实存在于gcp存储中。
但是,gcp数据流中的作业在启动后立即失败,并显示以下错误:
Runnable workflow has no steps specified.我已经在不同的管道中尝试了这个标志,甚至是apache beam示例管道,并且看到了相同的行为。
这可以通过使用字数统计示例来重现:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.11.0 \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=falsecd word-count-beam/在不使用experiments=upload_graph参数的情况下运行它是可行的:(确保指定您的项目,如果您想要运行它,请指定buckets )
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner --project=<your-gcp-project> \
--gcpTempLocation=gs://<your-gcs-bucket>/tmp \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
-Pdataflow-runner使用experiments=upload_graph运行它会导致管道失败,并显示消息workflow has no steps specified
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner --project=<your-gcp-project> \
--gcpTempLocation=gs://<your-gcs-bucket>/tmp \
--experiments=upload_graph \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
-Pdataflow-runner现在我希望dataflow runner会指示gcp dataflow从指定的存储桶中读取步骤,如源代码中所示:
然而,情况似乎并非如此。有没有人让这个功能正常工作,或者找到了一些关于这个特性的文档,可以为我指明正确的方向?
发布于 2019-04-27 05:06:32
实验已经恢复,消息传递将在光束2.13.0中进行更正
恢复PR
发布于 2020-07-10 04:55:41
我最近遇到了这个问题,解决方案非常愚蠢。我开发了一个相当复杂的数据流作业,它工作得很好,第二天停止工作,出现错误"Runnable workflow has no specified“。在我的例子中,有人在创建选项后指定了pipeline().run().waitUntilFinish()两次,因此,我得到了这个错误。删除重复的管道运行解决了此问题。我仍然认为在这种情况下,beam/dataflowrunner应该有一些有用的错误跟踪。
https://stackoverflow.com/questions/55828681
复制相似问题