首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从发布/订阅到ElasticSearch的Apache光束流

从发布/订阅到ElasticSearch的Apache光束流
EN

Stack Overflow用户
提问于 2019-11-21 02:41:45
回答 1查看 972关注 0票数 0

我正在编写一个ElasticSearch streaming管道,它从Google Cloud PubSub中读取消息,并将其写入java实例。目前,我使用的是direct runner,但计划在Google Cloud Dataflow上部署该解决方案。

首先,我编写了一个从PubSub读取和写入文本文件的管道,它可以正常工作。然后,我启动了ElasticSearch实例,这也可以正常工作。我用curl写了一些文档,很简单。

然后,当我尝试使用Beam的ElasticSearch连接器执行写操作时,我开始遇到一些错误。实际上,尽管我在pom.xml文件上添加了依赖项,但我还是得到了ava.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest

我所做的基本上是这样的:

代码语言:javascript
复制
messages.apply(
                        "TwoMinWindow",
                        Window.into(FixedWindows.of(new Duration(120*1000)))
                ).apply(
                        "ElasticWrite",
            ElasticsearchIO.write()
            .withConnectionConfiguration(
                             ElasticsearchIO.ConnectionConfiguration
                             .create(new String[]{"http://xxx.xxx.xxx.xxx:9200"}, "streaming_data", "string")
                             .withUsername("xxxx")
                             .withPassword("xxxxxxxx")
                             )
                );

使用DirectRunner,我可以连接到PubSub,但是当管道尝试连接ElasticSearch实例时,我得到了一个异常:

代码语言:javascript
复制
java.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest(Ljava/lang/String;Ljava/lang/String;[Lorg/apache/http/Header;)Lorg/elasticsearch/client/Response;
    at org.apache.beam.sdk.util.UserCodeException.wrap (UserCodeException.java:34)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup (Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor (DoFnInvokers.java:50)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load (DoFnLifecycleManager.java:104)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load (DoFnLifecycleManager.java:91)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture (LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync (LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad (LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get (LocalCache.java:2044)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get (LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad (LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get (LocalCache.java:4958)
    at org.apache.beam.runners.direct.DoFnLifecycleManager.get (DoFnLifecycleManager.java:61)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator (ParDoEvaluatorFactory.java:129)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication (ParDoEvaluatorFactory.java:79)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication (TransformEvaluatorRegistry.java:169)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest(Ljava/lang/String;Ljava/lang/String;[Lorg/apache/http/Header;)Lorg/elasticsearch/client/Response;
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion (ElasticsearchIO.java:1348)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup (ElasticsearchIO.java:1200)

我在pom.xml中添加的内容是:

代码语言:javascript
复制
    <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
  </dependency>

    <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client -->
  <dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>${elastic.version}</version>
  </dependency>

我被这个问题困住了,我不知道如何解决它。如果我使用JestClient,我可以毫无问题地连接到ElasticSearch。

你有什么建议吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-11-21 02:51:09

您正在使用没有performRequest(String, Header)方法的较新版本的RestClient。如果您查看the latest source code,您可以看到该方法现在采用Request,而在旧版本中采用there were methods that took Strings and Headers。这些方法被弃用,然后是removed from the code on September 1, 2018

要么更改您的代码以使用更新的弹性搜索库,要么指定与您的代码兼容的旧版本的库(它需要在7.0.x之前,例如6.8.4)。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58961289

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档