首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SparkStreaming: fileStream()中的错误

SparkStreaming: fileStream()中的错误
EN

Stack Overflow用户
提问于 2015-10-12 08:19:29
回答 1查看 1.6K关注 0票数 4

我正试图在scala中实现星火流应用程序。我希望使用fileStream()方法来处理新到达的文件以及显示在hadoop目录中的旧文件。

我跟踪了来自堆栈溢出的两个线程的fileStream()实现,具体如下:

我使用的fileStream()如下:

代码语言:javascript
复制
val linesRDD = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory, (t: org.apache.hadoop.fs.Path) => true, false).map(_._2.toString)

但我收到的错误消息如下:

代码语言:javascript
复制
type arguments [org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text,
org.apache.hadoop.mapred.TextInputFormat] conform to the bounds of none of the overloaded alternatives of value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> 
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:
String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V], 
implicit evidence$11: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)]

wrong number of type parameters for overloaded method value fileStream with alternatives: 
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <:     org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V], implicit evidence$11: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> 
[K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])
org.apache.spark.streaming.dstream.InputDStream[(K, V)] 

我使用的是spark 1.4.1hadoop 2.7.1。在发布这个问题之前,我已经看到了在堆栈溢出上讨论的不同的实现,并且还触发了文档,但是没有什么帮助我。任何帮助都将不胜感激。

谢谢拉杰尼什。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-10-16 07:02:42

请在下面的java代码示例中找到正确的导入,它对我来说很好。

代码语言:javascript
复制
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

JavaStreamingContext jssc = SparkUtils.getStreamingContext("key", jsc);
//      JavaDStream<String> rawInput = jssc.textFileStream(inputPath);

        JavaPairInputDStream<LongWritable, Text> inputStream = jssc.fileStream(
                inputPath, LongWritable.class, Text.class,
                TextInputFormat.class, new Function<Path, Boolean>() {
                    @Override
                    public Boolean call(Path v1) throws Exception {
                        if ( v1.getName().contains("COPYING") ) {
                            // This eliminates staging files.
                            return Boolean.FALSE;
                        }
                        return Boolean.TRUE;
                    }
                }, true);
        JavaDStream<String> rawInput = inputStream.map(
                  new Function<Tuple2<LongWritable, Text>, String>() {
                    @Override
                    public String call(Tuple2<LongWritable, Text> v1) throws Exception {
                      return v1._2().toString();
                    }
                });
        log.info(tracePrefix + "Created the stream, Window Interval: " + windowInterval + ", Slide interval: " + slideInterval);
        rawInput.print();
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33076339

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档