我使用星火1.6.0与火花流,并有一个广泛的操作问题。
代码示例:有一个名为"a“的RDD,它的类型是:类‘pysck.rdd.L.13 is’。
收到的"a“是:
# Load a text file and convert each line to a Row.
lines = sc.textFile(filename)
parts = lines.map(lambda l: l.split(","))
clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(clients)
schemaPeople.registerTempTable("clients")
client_list = sqlContext.sql("SELECT * FROM clients")及之后:
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry)))第二部分"b“类型为'pyspark.streaming.dstream.TransformedDStream'.类。我收到了水槽的"b“:
DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore')))和
b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event)))问题是:当我尝试以以下方式加入时,:
mult = b.transform(lambda rdd: rdd.join(a))我的应用程序挂在这个阶段的(现在我在b.pprint()和.join()之前显示屏幕)

但当我说:

因此:
我有RDDs "a“和"test”。DStream "b".我可以乘:
但我不能做b*a。
任何帮助都是非常感谢的!谢谢!
发布于 2017-02-21 02:50:08
根据user6910411的建议,我将"a“缓存为
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry))).cache() 问题就解决了。
https://stackoverflow.com/questions/42291463
复制相似问题