我正在使用PySpark将我的遗留Python代码转换为Spark。
我希望获得一个等同于以下内容的PySpark:
usersofinterest = actdataall[actdataall['ORDValue'].isin(orddata['ORDER_ID'].unique())]['User ID']actdataall和orddata都是Spark数据帧。
考虑到与toPandas()函数相关的缺点,我不想使用它。
发布于 2017-08-16 17:21:44
首先,让我们创建一个dataframe,其中包含我们想要保留的订单ID:
orderid_df = orddata.select(orddata.ORDER_ID.alias("ORDValue")).distinct()
现在让我们将它与actdataall数据帧连接起来:
usersofinterest = actdataall.join(orderid_df,"ORDValue","inner").select('User orderid_df你的订单is的目标列表很小,那么你可以使用furianpandit的帖子中提到的pyspark.sql isin函数,在使用它之前不要忘记广播你的变量(spark会将对象复制到每个节点,使他们的任务更快):
sc.broadcast(orderid_list) = orddata.select('ORDER_ID').distinct().rdd.flatMap(lambda x:x) orderid_list =.collect()
发布于 2017-08-21 12:34:42
你的代码最直接的翻译方式是:
from pyspark.sql import functions as F
# collect all the unique ORDER_IDs to the driver
order_ids = [x.ORDER_ID for x in orddata.select('ORDER_ID').distinct().collect()]
# filter ORDValue column by list of order_ids, then select only User ID column
usersofinterest = actdataall.filter(F.col('ORDValue').isin(order_ids)).select('User ID')然而,只有当'ORDER_ID‘的数量非常小(可能小于100,000左右)时,你才应该像这样过滤。
如果“ORDER_ID”的数量很大,你应该使用广播变量将order_ids列表发送给每个执行器,这样它就可以在本地与order_ids进行比较,从而加快处理速度。注意,即使“ORDER_ID”很小,这也是可行的。
order_ids = [x.ORDER_ID for x in orddata.select('ORDER_ID').distinct().collect()]
order_ids_broadcast = sc.broadcast(order_ids) # send to broadcast variable
usersofinterest = actdataall.filter(F.col('ORDValue').isin(order_ids_broadcast.value)).select('User ID')有关广播变量的更多信息,请查看:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html
发布于 2017-07-09 20:39:58
因此,您有两个spark数据帧。一个是actdataall,另一个是orddata,然后使用下面的命令来获得你想要的结果。
usersofinterest = actdataall.where(actdataall['ORDValue'].isin(orddata.select('ORDER_ID').distinct().rdd.flatMap(lambda x:x).collect()[0])).select('User ID')https://stackoverflow.com/questions/44444632
复制相似问题