首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >运行Apache流时获取Hadoop OutputFormat RunTimeException

运行Apache流时获取Hadoop OutputFormat RunTimeException
EN

Stack Overflow用户
提问于 2016-07-21 11:50:07
回答 2查看 982关注 0票数 1

我正在运行一个程序,它使用Apache从Apache集群获取数据,并将数据放在Hadoop文件中。我的计划如下:

代码语言:javascript
复制
public final class SparkKafkaConsumer {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        String[] topics = "Topic1, Topic2, Topic3".split(",");
        for (String topic: topics) {
            topicMap.put(topic, 3);
        }
        JavaPairReceiverInputDStream<String, String> messages =
                KafkaUtils.createStream(jssc, "kafka.test.com:2181", "NameConsumer", topicMap);
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String x) {
                return Lists.newArrayList(",".split(x));
            }
        });
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        wordCounts.print();
        wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt");
        jssc.start();
        jssc.awaitTermination();
    }
}

我使用这个命令来提交应用程序:C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 --class "SparkKafkaConsumer" --master local[4] target\simple-project-1.0.jar

我得到了一个错误:java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2148)

是什么导致了这个错误,我该如何解决呢?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-07-21 13:49:58

我同意错误并不是真正的引人注意,但通常最好在任何saveAsHadoopFile方法中指定要输出的数据的格式,以保护自己免受此类异常的影响。

以下是文档中特定方法的原型:

代码语言:javascript
复制
saveAsHadoopFiles(java.lang.String prefix, java.lang.String suffix, java.lang.Class<?> keyClass, java.lang.Class<?> valueClass, java.lang.Class<F> outputFormatClass)

在您的例子中,这将对应:

代码语言:javascript
复制
wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class)

基于wordCounts PairDStream的格式,我选择Text作为键的String类型,选择IntWritable作为与键关联的值为Integer类型。

如果您只想要基本的纯文本文件,可以使用TextOutputFormat,但是可以查看FileOutputFormat的子类以获得更多的输出选项。

正如所问的那样,Text类来自org.apache.hadoop.io包,TextOutputFormat来自org.apache.hadoop.mapred包。

票数 4
EN

Stack Overflow用户

发布于 2016-12-15 12:48:30

只是为了完整(乔纳森给出了正确的答案)

代码语言:javascript
复制
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;

...
wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38503502

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档