首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark Streaming collect()

Spark Streaming collect()
EN

Stack Overflow用户
提问于 2017-06-29 16:40:53
回答 1查看 609关注 0票数 0

我正在使用spark streaming,我开发了以下spark streaming应用程序:

它们分别从Kafka接收器(RDD1)和HTTP请求(RDD2)创建一个DStream。

我的问题是,我只想使用RDD1中的第一个元素,并在我的RDD2中使用它,而这段代码在spark streaming (.first())中不起作用,我如何才能在spark streaming 1.6中获得相同的结果

代码:

代码语言:javascript
复制
   firstLineRDD = kvs.map(lambda x : x[0], x[1].split('\n')[0], x[2])
   dateRDD = firstLineRDD.map(lambda x : (datetime.datetime.fromtimestamp(float(x[0])/1000000),x[1],x[2]))
   dayAggRDD = dateRDD.map(lambda x : (x[0],x[1],x[2]))
   daily_date, sys , metric  = dayAggRDD.first()
   dataTSRDD = sc.parallelize(apiRequest(sys,metric,getDailyDate(daily_date)))
EN

回答 1

Stack Overflow用户

发布于 2018-03-15 20:05:48

我在这里使用了两个Kafka流,但它几乎与您的用例相同。(在java中).Have将流转换为数据集,然后您可以获取第一个数据集的第一个元素,然后使用这些值来操作第二个one.Following是代码的简要摘要:

代码语言:javascript
复制
DstreamCoord.foreachRDD(new VoidFunction<JavaRDD<Coordinates>>() {

        String valueA;
        String valueB;

        public void call(JavaRDD<Coordinates> arg0) throws Exception {
            // TODO Auto-generated method stub

        RDD<Coordinates> d = arg0.rdd();

        Dataset<Row> dataset1 = session.createDataFrame(d, Coordinates.class);

        Row firstElement = dataset1.first();
            valueA = firstElement.getString(1);
            valueB = firstElement.getString(1);

            JavaInputDStream<ConsumerRecord<String, String>> stream2 =
                      org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
                        StreamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                      );

            stream2.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String,String>>>() {

                public void call(JavaRDD<ConsumerRecord<String, String>> arg0) throws Exception {
                    // TODO Auto-generated method stub

                Dataset<Row> dataset2 =     session.createDataFrame(arg0.rdd(), Coordinates.class);



                }
            });
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44820225

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档