Spark 1.6.1,pyspark的问题
我收到的流媒体数据就像
{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}我有一个名为writeToHBase( rdd )的写入HBase的函数,期望得到一个具有以下结构的元组的rdd:
(rowkey, [rowkey, column-family, key, value])正如您从输入格式中看到的,我必须获取原始数据集并迭代所有键,使用send函数调用发送每个键/值对。
阅读spark streaming编程指南中的“使用foreachRDD的设计模式”一节http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13
在数据集之外做一些事情时,似乎建议使用foreachRDD。在我的例子中,我希望通过网络将数据写入HBase,因此我对我的流数据使用foreachRDD,并调用将处理数据发送的函数:
stream.foreachRDD(lambda k: process(k))我现在对spark函数的理解非常有限,所以我无法想出一种方法来迭代我的原始数据集来使用我的write函数。如果它是一个python iterable,我可以这样做:
def process(rdd):
for key, value in my_rdd.iteritems():
writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))其中rowkey是通过在rdd本身中找到它来获得的。
rdd.map(lambda x: x['rowkey'])如何在pyspark中完成process()要做的事情?我看到一些使用foreach的例子,但我不能让它做我想做的事情。
发布于 2016-05-28 14:05:02
当你的writeToHBase函数需要一个rdd作为论据时,你为什么要迭代rdd呢?只需在流程函数中调用writeToHBase(rdd),就行了。
如果需要从rdd获取每条记录,可以调用
def processRecord(record):
print(record)
rdd.foreach(processRecord)在processRecord函数中,您将获得要处理的单个记录。
https://stackoverflow.com/questions/37492402
复制相似问题