首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在pyspark中使用foreachRDD和foreach遍历rdd

在pyspark中使用foreachRDD和foreach遍历rdd
EN

Stack Overflow用户
提问于 2016-05-28 05:19:32
回答 1查看 28.7K关注 0票数 1

Spark 1.6.1,pyspark的问题

我收到的流媒体数据就像

代码语言:javascript
复制
{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}

我有一个名为writeToHBase( rdd )的写入HBase的函数,期望得到一个具有以下结构的元组的rdd:

代码语言:javascript
复制
(rowkey, [rowkey, column-family, key, value])

正如您从输入格式中看到的,我必须获取原始数据集并迭代所有键,使用send函数调用发送每个键/值对。

阅读spark streaming编程指南中的“使用foreachRDD的设计模式”一节http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13

在数据集之外做一些事情时,似乎建议使用foreachRDD。在我的例子中,我希望通过网络将数据写入HBase,因此我对我的流数据使用foreachRDD,并调用将处理数据发送的函数:

代码语言:javascript
复制
stream.foreachRDD(lambda k: process(k))

我现在对spark函数的理解非常有限,所以我无法想出一种方法来迭代我的原始数据集来使用我的write函数。如果它是一个python iterable,我可以这样做:

代码语言:javascript
复制
def process(rdd):
    for key, value in my_rdd.iteritems():
        writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))

其中rowkey是通过在rdd本身中找到它来获得的。

代码语言:javascript
复制
rdd.map(lambda x: x['rowkey'])

如何在pyspark中完成process()要做的事情?我看到一些使用foreach的例子,但我不能让它做我想做的事情。

EN

回答 1

Stack Overflow用户

发布于 2016-05-28 14:05:02

当你的writeToHBase函数需要一个rdd作为论据时,你为什么要迭代rdd呢?只需在流程函数中调用writeToHBase(rdd),就行了。

如果需要从rdd获取每条记录,可以调用

代码语言:javascript
复制
def processRecord(record):
        print(record)   
rdd.foreach(processRecord)

在processRecord函数中,您将获得要处理的单个记录。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37492402

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档