我想把spark-streaming转换成几个弹性搜索索引。我创建了成对的<key(index), value>,当我执行groupByKey时,结果是<key(index), Iterable<value>>的元组,但是为了使用elasticsearch-spark插件保存到elasticsearch,我需要将值作为JavaRDD<value>。
我知道有一个从list创建JavaRDD的sparkContext.parallelize(list)选项,但它只能在驱动程序上执行。
有没有其他选项可以创建可以在executor上执行的JavaRDD?或者我可以用另一种方式来实现在executor上工作的Tuple2<key(index), JavaRDD<value>>?如果不是,我如何才能在驱动程序上将迭代器切换到JavaRDD,并在执行器中将插件写入elasticsearch?
谢谢,
丹妮拉
发布于 2016-08-09 06:50:08
我想说的是,像下面这样的smth是有可能的
JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2());
JavaRDD<Value> onlyValues = values.flatMap(it -> it);另一种方法是
JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1);
JavaRDD<Value> values = keyValues.map(t2 -> t2._2());https://stackoverflow.com/questions/38823176
复制相似问题