首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala:类型不匹配文本[Tuple2[文本,文本],NotInferedR]

Scala:类型不匹配文本[Tuple2[文本,文本],NotInferedR]
EN

Stack Overflow用户
提问于 2016-07-07 16:35:04
回答 0查看 575关注 0票数 2

我尝试做以下几件事:

代码语言:javascript
复制
env
  .readHadoopFile(new TeraInputFormat(), classOf[Text], classOf[Text], inputPath)
  .map(tp => tp)

但是在我的编辑器中出现了一个类型不匹配的错误:

代码语言:javascript
复制
Expected: MapFunction[Tuple2[Text, Text], NotInferedR], actual: (Nothing) => Nothing

我该如何解决这个问题呢?

这是完整的代码:

代码语言:javascript
复制
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job

class OptimizedFlinkTeraPartitioner(underlying:TotalOrderPartitioner) extends Partitioner[OptimizedText] {
  def partition(key:OptimizedText, numPartitions:Int):Int = {
    underlying.getPartition(key.getText())
  }
}


object FlinkTeraSort {

  implicit val textOrdering = new Ordering[Text] {
    override def compare(a:Text, b:Text) = a.compareTo(b)
  }

  def main(args: Array[String]){
    if(args.size != 4){
      println("Usage: FlinkTeraSort hdfs inputPath outputPath #partitions ")
      return
    }

    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.enableObjectReuse()

    val hdfs = args(0)
    val inputPath= hdfs+args(1)
    val outputPath = hdfs+args(2)
    val partitions = args(3).toInt

    val mapredConf = new JobConf()
    mapredConf.set("fs.defaultFS", hdfs)
    mapredConf.set("mapreduce.input.fileinputformat.inputdir", inputPath)
    mapredConf.set("mapreduce.output.fileoutputformat.outputdir", outputPath)
    mapredConf.setInt("mapreduce.job.reduces", partitions)

    val partitionFile = new Path(outputPath, TeraInputFormat.PARTITION_FILENAME)
    val jobContext = Job.getInstance(mapredConf)
    TeraInputFormat.writePartitionFile(jobContext, partitionFile)
    val partitioner = new OptimizedFlinkTeraPartitioner(new TotalOrderPartitioner(mapredConf, partitionFile))

    val data = env.readHadoopFile(new TeraInputFormat(), classOf[Text], classOf[Text], inputPath)

    data.map(tp => tp)

    data.output(new HadoopOutputFormat[Text, Text](new TeraOutputFormat(), jobContext))

    env.execute("TeraSort")
  }
}

(build.sbt):

代码语言:javascript
复制
name := "terasort"

version := "0.0.1"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.0.3"

fork in run := true
EN

回答

页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38240903

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档