在我的RDD中,我有一个元组列表,每个元组如下所示:
(UserID,ProductsID1,ProductsID2,ProductsID3.)
例如:
('A395BORC6FGVXV', ['B001E4KFG0', 'B00813GRG4','B000LQOCH0','B006K2ZZ7K']),('A3SGXH7AUHU8GW', ['B001E4KFG0','B00813GRG4','B000LQOCH0']), ('AZOF9E17RGZH8', ['B001GVISJM']), ('ARYVQL4N737A1', ['B001GVISJM'])因此,每当我遇到两个用户时,我都必须创建一个结果元组,这些用户对两个以上的产品进行了共同评级,并将结果元组放入一个新列表中。
(UserID1,UserID2,CommonProduct1,CommonProduct2,.)
我的示例的唯一输出应该是如下所示的包含1个元素的列表:
('A395BORC6FGVXV', 'A3SGXH7AUHU8GW', ['B001E4KFG0', 'B00813GRG4','B000LQOCH0'])如何在SparkCore (火花)中做到这一点?
发布于 2022-05-03 22:42:04
df
df =spark.createDataFrame([('A395BORC6FGVXV', ['B001E4KFG0', 'B00813GRG4','B000LQOCH0','B006K2ZZ7K']),('A3SGXH7AUHU8GW', ['B001E4KFG0','B00813GRG4','B000LQOCH0']), ('AZOF9E17RGZH8', ['B001GVISJM']), ('ARYVQL4N737A1', ['B001GVISJM'])],('UserID' ,'Product'))
df.show(truncate=False)解决方案
df=(df.withColumn('Product', explode('Product')).groupby('Product')#Product per row
.agg(collect_list('UserID').alias('UserID')).groupby('UserID').agg(collect_list('Product').alias('product'))# aggregate lists of Users and Products
.where(size(col('Product'))>=2)#Filter out where there exists more than two products per known groups of users
.select(concat(lit('('),array_join('UserID',','),lit(' ['),array_join('Product',','),lit('])')).alias('string')))#Creates String required for output using concat
df.show(truncate=False)结果
+------------------------------------------------------------------+
|string |
+------------------------------------------------------------------+
|(A395BORC6FGVXV,A3SGXH7AUHU8GW [B000LQOCH0,B00813GRG4,B001E4KFG0])|
+------------------------------------------------------------------+https://stackoverflow.com/questions/72105114
复制相似问题