我的工作是通过SparkSQL实现HDFS到弹性搜索的集成。我可以从HDFS读取csv数据并创建弹性搜索索引。为了创建Elastic search Index ID,我使用了csv数据中的一个唯一列。现在我的要求是弹性搜索索引ID应该是2个CSV列的组合。有没有人知道我该如何做到这一点?我正在使用elasticsearch-spark库创建索引。以下是示例代码。
SparkSession sparkSession = SparkSession.builder().config(config).getOrCreate();
SQLContext ctx = sparkSession.sqlContext();
HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "hdfs://localhost:9000/test");
Dataset df = ctx.read().format("com.databricks.spark.csv").options(options).load();
JavaEsSparkSQL.saveToEs(df, "spark/test", ImmutableMap.of("es.mapping.id", "Id"));发布于 2017-09-14 01:57:08
将ID值修改为组合键,然后将数据集保存到弹性搜索中:
df.registerTempTable("tmp");
Dataset ds= spark.sql("select concat(Id,<another composite key column>) as Id ,<rest of the columns> from tmp");
JavaEsSparkSQL.saveToEs(df, "spark/test", ImmutableMap.of("es.mapping.id", "Id"));https://stackoverflow.com/questions/46192894
复制相似问题