首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带Flink和Scala的neo4j

带Flink和Scala的neo4j
EN

Stack Overflow用户
提问于 2017-08-31 16:24:44
回答 1查看 532关注 0票数 0

我正在使用Scala2.11.7和Flink 1.3.2处理数据。现在,我想将生成的org.apache.flink.api.scala.DataSet存储在neo4j图形数据库中。

有一些Github项目是为了兼容性:

最有希望的方法是什么?或者我应该直接使用新4j的REST吗?

(顺便说一句:为什么堆栈溢出限制链接的数量.?)

我尝试了flink-ne4j,但是在混合Java和Scala类时似乎存在一些问题:

代码语言:javascript
复制
package dummy.neo4j

import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat
import org.apache.flink.api.java.tuple.{Tuple, Tuple2}
import org.apache.flink.api.scala._

object Neo4jDummyWriter {

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val outputFormat: OutputFormat[_ <: Tuple] = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/")
  .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})")
  .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish

    val tuple1: Tuple = new Tuple2("abc", 1)
    val tuple2: Tuple = new Tuple2("def", 2)

    val test = env.fromElements[Tuple](tuple1, tuple2)
    println("test: " + test.getClass)
    test.output(outputFormat)
  }

}

线程"main“中的异常:[ [Lorg.apache.flink.api.common.typeinfo.TypeInformation;;无法在dummy.neo4j.Neo4jDummyWriter$.main(Neo4jDummyWriter.scala:20) at dummy.neo4j.Neo4jDummyWriter.main(Neo4jDummyWriter.scala)上转换为dummy.neo4j.Neo4jDummyWriter.main(Neo4jDummyWriter.scala)

类型不匹配,预期: OutputFormatTuple,实际: OutputFormat_ <:Tuple

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-15 15:19:09

解决方案不是将Tuple2对象更改为元组:

代码语言:javascript
复制
package dummy.neo4j

import org.apache.flink.api.common.io._
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala._

object Neo4jDummyWriter {

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val tuple1 = ("user9", 1978)
    val tuple2 = ("user10", 1996)
    val datasetWithScalaTuples = env.fromElements(tuple1, tuple2)
    val dataset: DataSet[Tuple2[String, Int]] = datasetWithScalaTuples.map(tuple => new Tuple2(tuple._1, tuple._2))

    val outputFormat = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/").setUsername("neo4j").setPassword("...")
  .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})")
  .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish.asInstanceOf[OutputFormat[Tuple2[String, Int]]]

    dataset.output(outputFormat)
    env.execute
  }

}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45986124

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档