我正在使用Scala2.11.7和Flink 1.3.2处理数据。现在,我想将生成的org.apache.flink.api.scala.DataSet存储在neo4j图形数据库中。
有一些Github项目是为了兼容性:
最有希望的方法是什么?或者我应该直接使用新4j的REST吗?
(顺便说一句:为什么堆栈溢出限制链接的数量.?)
我尝试了flink-ne4j,但是在混合Java和Scala类时似乎存在一些问题:
package dummy.neo4j
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat
import org.apache.flink.api.java.tuple.{Tuple, Tuple2}
import org.apache.flink.api.scala._
object Neo4jDummyWriter {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val outputFormat: OutputFormat[_ <: Tuple] = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/")
.setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})")
.addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish
val tuple1: Tuple = new Tuple2("abc", 1)
val tuple2: Tuple = new Tuple2("def", 2)
val test = env.fromElements[Tuple](tuple1, tuple2)
println("test: " + test.getClass)
test.output(outputFormat)
}
}线程"main“中的异常:[ [Lorg.apache.flink.api.common.typeinfo.TypeInformation;;无法在dummy.neo4j.Neo4jDummyWriter$.main(Neo4jDummyWriter.scala:20) at dummy.neo4j.Neo4jDummyWriter.main(Neo4jDummyWriter.scala)上转换为dummy.neo4j.Neo4jDummyWriter.main(Neo4jDummyWriter.scala)
和
类型不匹配,预期: OutputFormatTuple,实际: OutputFormat_ <:Tuple
发布于 2017-09-15 15:19:09
解决方案不是将Tuple2对象更改为元组:
package dummy.neo4j
import org.apache.flink.api.common.io._
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala._
object Neo4jDummyWriter {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val tuple1 = ("user9", 1978)
val tuple2 = ("user10", 1996)
val datasetWithScalaTuples = env.fromElements(tuple1, tuple2)
val dataset: DataSet[Tuple2[String, Int]] = datasetWithScalaTuples.map(tuple => new Tuple2(tuple._1, tuple._2))
val outputFormat = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/").setUsername("neo4j").setPassword("...")
.setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})")
.addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish.asInstanceOf[OutputFormat[Tuple2[String, Int]]]
dataset.output(outputFormat)
env.execute
}
}https://stackoverflow.com/questions/45986124
复制相似问题