首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Java中迭代DStream

如何在Java中迭代DStream
EN

Stack Overflow用户
提问于 2016-05-22 10:09:10
回答 1查看 4.5K关注 0票数 2

我是星火编程的新手。我有一个火花流程序,它需要将接收到的DStream存储到数据库中,我想迭代我的数据流并将每条记录存储到数据库中。

就像这样。

代码语言:javascript
复制
JavaStreamingContext streamingContext = getSparkStreamingContext();

JavaReceiverInputDStream<String> socketTextStream = streamingContext
                .socketTextStream("localhost", 8080);

DStream<String> dstream = socketTextStream.dstream();

// Iterate each record from the DStream and push it to DB

方法2 :

这样做对吗?这种方法会带来任何性能增益/问题吗?

代码语言:javascript
复制
socketTextStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
    @Override
    public Void call(JavaRDD<String> rdd) throws Exception {

        List<String> collect = rdd.collect();

        for (String string : collect) {

            System.out.println(string);
        }

        return null;
    }
});
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-05-22 10:16:42

您可以使用JavaDStream.foreachRDDJavaRDD.foreach

代码语言:javascript
复制
JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
        .socketTextStream("localhost", 8080);

socketTextStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    @Override
    public void call(JavaRDD<String> rdd) throws Exception {
        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                // Save data
            }
        });
    }
});

或使用Java 8 Lambda表达式

代码语言:javascript
复制
JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
        .socketTextStream("localhost", 8080);

socketTextStream.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
    rdd.foreach((VoidFunction<String>) s -> {
        // Save data
    });
});

编辑

既然您使用的是Spark1.2.0(这有点老了,我建议升级一下(最新版本是1.6.1,截止到2016年5月22日)):

代码语言:javascript
复制
socketTextStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
    @Override
    public Void call(JavaRDD<String> rdd) throws Exception {
        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                // Save data
            }
        });
        return null;
    }
});
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37372967

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档