在独立的星星之火中,我正在尝试从一个dataframe写到Elasticsearch。虽然我可以做到这一点,但我无法理解的是如何写入一个动态命名的索引,该索引格式为‘index _name-{ts_col:{YYYY dd}’,其中'ts_col‘是数据集中的一个日期时间字段。
我见过各种各样的帖子说语法类型应该有效,但是当我尝试时,我会得到底部包含的错误。在创建索引之前,它似乎首先检查索引是否存在,但它传递的是未格式化的索引名,而不是动态创建的索引名称。我已经尝试使用模块使用相同的语法创建索引,但是它不能处理动态索引名。
是否有任何解决方案可供我使用,或者我是否必须在spark中循环我的数据集以找到所表示的每个日期,创建我需要的索引,然后一次写到每个索引?我漏掉了什么明显的东西吗?洛格斯塔什很容易做到这一点,我不明白为什么我不能让它在星火中工作。
下面是我使用的写命令(也尝试了不同的变体):
df.write.format("org.elasticsearch.spark.sql")
.option('es.index.auto.create', 'true')
.option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')
.option('es.mapping.id', 'es_id')
.save()这是我用的罐子:
elasticsearch-hadoop-5.0.0/dist/elasticsearch-spark-20_2.11-5.0.0.jar下面是我使用上面的写命令时遇到的错误:
错误NetworkClient:节点##.##.##.##:9200失败(无效的目标URI ##.##.##.##:9200选择了下一个节点##.##.##.##:9200 ..。 ..。 Py4JJavaError:调用o114.save时出错。::org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException:连接错误(检查网络和/或代理设置)-所有节点都失败;
如果我给True设定了覆盖,我得到:
Py4JJavaError:调用o58.save时出错。:org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:在org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) at org没有这样的索引为null.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92) at org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:442)在org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(方法)py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280)在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214)在java.lang.Thread.run(Thread.java:745)
如果我试图使用Elasticsearch客户机提前创建索引:
RequestError: TransportError(400,u‘无效_ index _ name _TransportError’,u‘无效索引名index_name-{ts_col:YYYY.MM.dd},必须是小写)
发布于 2017-02-25 11:20:11
您不需要再次将日期格式放在大括号中。你可以读更多关于它的这里
.option(‘es.resources’,'index_name-{ts_col:{YYYY.mm.dd}}/type_name')
如下图所示,更改上述内容:
.option('es.resource', 'index_name-{ts_col:YYYY.mm.dd}/type_name')注意:确保您的ts_col字段有正确的日期格式。
https://stackoverflow.com/questions/42445904
复制相似问题