首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >应用程序挂起,当我从DStream加入DStream和RDD时

应用程序挂起,当我从DStream加入DStream和RDD时
EN

Stack Overflow用户
提问于 2017-02-17 07:04:29
回答 1查看 449关注 0票数 2

我使用星火1.6.0与火花流,并有一个广泛的操作问题。

代码示例:有一个名为"a“的RDD,它的类型是:类‘pysck.rdd.L.13 is’。

收到的"a“是:

代码语言:javascript
复制
# Load a text file and convert each line to a Row.
    lines = sc.textFile(filename)
    parts = lines.map(lambda l: l.split(","))
    clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...))

    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = sqlContext.createDataFrame(clients)
    schemaPeople.registerTempTable("clients")

    client_list = sqlContext.sql("SELECT * FROM clients")

及之后:

代码语言:javascript
复制
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry)))

第二部分"b“类型为'pyspark.streaming.dstream.TransformedDStream'.类。我收到了水槽的"b“:

代码语言:javascript
复制
DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore')))

代码语言:javascript
复制
b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event)))

问题是:当我尝试以以下方式加入时,

代码语言:javascript
复制
mult = b.transform(lambda rdd: rdd.join(a))

我的应用程序挂在这个阶段的(现在我在b.pprint()和.join()之前显示屏幕)

但当我说:

  1. 声明RDD“测试”: test = sc.parallelize(range(1,100000)).map(lambda:(k,'value')) 并做: mult0 =a.join(测试) mult = b.transform(lambda :rdd.join(Mult0)) 然后它起作用了(!!):

  1. 我也能做到: mult0 = b.transform(lambda :rdd.join(测试))

因此

我有RDDs "a“和"test”。DStream "b".我可以乘:

  • A*测试*b
  • B*试验

但我不能做b*a。

任何帮助都是非常感谢的!谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-02-21 02:50:08

根据user6910411的建议,我将"a“缓存为

代码语言:javascript
复制
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry))).cache() 

问题就解决了。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42291463

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档