首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >HDFS : java.io.FileNotFoundException : File不存在: name._COPYING

HDFS : java.io.FileNotFoundException : File不存在: name._COPYING
EN

Stack Overflow用户
提问于 2017-02-04 13:45:42
回答 1查看 7.8K关注 0票数 0

我正在使用Scala处理星火流。我需要用以下行从HDFS目录中读取一个.csv文件:

代码语言:javascript
复制
 val lines = ssc.textFileStream("/user/root/")

我使用以下命令行将文件放入HDFS中:

代码语言:javascript
复制
hdfs dfs -put ./head40k.csv

对于一个相对较小的文件,它可以很好地工作。当我尝试使用更大的错误时,我会得到以下错误:

代码语言:javascript
复制
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/head800k.csv._COPYING

我能理解为什么,但我不知道怎么解决它。我也尝试过这个解决方案:

代码语言:javascript
复制
hdfs dfs -put ./head800k.csv /user
hdfs dfs -mv /usr/head800k.csv /user/root

但我的程序不读文件。有什么想法吗?提前感谢

节目:

代码语言:javascript
复制
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.sys.process._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.HashMap
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import StreamingContext._

object Traccia2014{
  def main(args: Array[String]){
if (args.length < 2) {
  System.err.println(s"""
    |Usage: DirectKafkaWordCount <brokers> <test><topicRisultato>
    |  <brokers> is a list of one or more Kafka brokers
    |  <topics> is a list of one or more kafka topics to consume from
    |
    """.stripMargin)
  System.exit(1)
}

val Array(brokers,risultato) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(5))

  val lines = ssc.textFileStream("/user/root/")

 //val lines= ssc.fileStream[LongWritable, Text, TextInputFormat](directory="/user/root/",
     // filter = (path: org.apache.hadoop.fs.Path) => //(!path.getName.endsWith("._COPYING")),newFilesOnly = true)

  //********** Definizioni Producer***********

val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

val slice=30

lines.foreachRDD( rdd => {

     if(!rdd.isEmpty){
         val min=rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
         if(!min.isEmpty){
             val ipDst= rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
             if(!ipDst.isEmpty){
                val ipSrc=rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(1)),1)).reduceByKey(_ + _)
                 if(!ipSrc.isEmpty){

                    val Rapporto=ipSrc.leftOuterJoin(ipDst).mapValues{case (x,y) => x.asInstanceOf[Int] / y.getOrElse(1) }

                    val RapportoFiltrato=Rapporto.filter{case (key, value) => value > 100 }
                    println("###(ConsumerScala) CalcoloRapporti: ###")
                    Rapporto.collect().foreach(println)
                   val str = Rapporto.collect().mkString("\n")

                      println(s"###(ConsumerScala) Produco Risultato : ${str}")

                      val message = new ProducerRecord[String, String](risultato, null, str)
                      producer.send(message)

  Thread.sleep(1000)


                 }else{
                   println("src vuoto")
            }
                 }else{
                    println("dst vuoto")
             }
             }else{
                println("min vuoto")
            }
                }else
                { 
                 println("rdd vuoto")
              }

              })//foreach


ssc.start()
ssc.awaitTermination()


} }
EN

回答 1

Stack Overflow用户

发布于 2017-02-04 13:53:42

/user/root/head800k.csv._COPYING是一个临时文件,它是在复制过程进行时创建的。等待复制过程完成,如果没有_COPYING后缀,即/user/root/head800k.csv,就会失败。

要在火花流作业中过滤这些瞬态,可以使用fileStream方法文档化的这里,如下所示

代码语言:javascript
复制
 ssc.fileStream[LongWritable, Text, TextInputFormat](
      directory="/user/root/",
      filter = (path: org.apache.hadoop.fs.Path) => (!path.getName.endsWith("_COPYING")), // add other filters like files starting with dot etc
      newFilesOnly = true)

编辑

由于要将文件从本地文件系统移动到HDFS,最好的解决方案是将文件移动到HDFS中的临时暂存位置,然后将它们移动到目标目录。在HDFS文件系统中复制或移动应避免瞬态文件。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42041110

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档