我正在尝试对Spark中的一些数据使用FPGrowth函数。我在这里测试了这个示例,没有任何问题:https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html
但是,我的数据集来自hive
data = hiveContext.sql('select transactionid, itemid from transactions')
model = FPGrowth.train(data, minSupport=0.1, numPartitions=100)此失败的原因是方法不存在:
py4j.protocol.Py4JError: An error occurred while calling o764.trainFPGrowthModel. Trace:
py4j.Py4JException: Method trainFPGrowthModel([class org.apache.spark.sql.DataFrame, class java.lang.Double, class java.lang.Integer]) does not exist因此,我将其转换为RDD:
data=data.rdd现在我开始收到一些奇怪的pickle序列化错误。
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)然后我开始看类型。在本例中,数据通过平面地图运行。这将返回与RDD不同的类型。
flatmap返回的RDD类型: pyspark.rdd.PipelinedRDD
hiveContext返回的RDD类型: pyspark.rdd.RDD
FPGrowth似乎只适用于PipelinedRDD。有没有什么方法可以把普通的RDD转换成PipelinedRDD?
谢谢!
发布于 2016-04-29 15:46:10
好吧,我的查询是错误的,但我将其更改为使用collect_set,然后我通过执行以下操作设法避免了类型错误:
data=data.map(lambda row: row[0])https://stackoverflow.com/questions/36927624
复制相似问题