我们运行了一个Google批处理作业,它将数据记录写入一个pubsub管道,并有一个单独的流作业,它使用“pubsub to elasticsearch”流模板从PubSub管道中提取数据,并将更新写入我们的Elasticsearch索引。但是,我们不得不终止写入Elastic的流作业,因为在1次尝试之后,我们遇到了多个“向ES写入错误”。再也不允许“错误”了。
从PubSub读取并写入Elasticsearch的任务是写入不是流索引的索引。它实际上需要在索引中更新相同的文档,可能会多次更新。
这项工作需要写入流索引吗?如果不是,我们如何配置数据流作业,使其实际上放慢速度,而不是压倒弹性?
我已经看到,增加thread_pool.index.bulk.queue_size是可以使用的,但不建议使用。
Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Error writing to ES after 1 attempt(s). No more attempts allowed
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBundleFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1751)
org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:111)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:538)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Error writing to ES after 1 attempt(s). No more attempts allowed
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.handleRetry(ElasticsearchIO.java:2569)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushBatch(ElasticsearchIO.java:2519)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2435)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.finishBundle(ElasticsearchIO.java:2396)发布于 2022-09-15 23:33:01
来自模板的官方文件:
Dataflow模板使用Elasticsearch的数据流特性来存储跨多个索引的时间序列数据,同时为请求提供一个命名资源
您可以使用一些参数来控制这一点,听起来您会想要更改默认值:
bulkInsertMethod(可选)是使用INDEX(索引,允许上服务器)还是CREATE(创建,重复_id上的错误)与Elasticsearch批量请求。默认值:CREATE。usePartialUpdate(可选)是否在Elasticsearch请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false。maxRetryAttempts(可选)最大重试尝试,必须是>0。默认情况:不重试。
https://stackoverflow.com/questions/73737323
复制相似问题