我有一个pyspark RDD,它有大约200万个元素。我不能一次收集它们,因为它会导致OutOfMemoryError异常。
如何批量采集?
这是一个潜在的解决方案,但我怀疑还有更好的方法:收集一个批处理(使用take、https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.RDD.take.html#pyspark.RDD.take),然后从该批处理中的RDD中删除所有元素(使用filter、https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.RDD.filter.html#pyspark.RDD.filter,但我怀疑还有更好的方法),重复操作,直到没有收集到任何元素。
发布于 2021-10-12 12:05:21
我不确定这是不是一个好的解决方案,但是你可以用一个索引压缩你的rdd,然后在这个索引上过滤,以批量收集项目:
big_rdd = spark.sparkContext.parallelize([str(i) for i in range(0, 100)])
big_rdd_with_index = big_rdd.zipWithIndex()
batch_size = 10
batches = []
for i in range(0, 100, batch_size):
batches.append(big_rdd_with_index.filter(lambda element: i <= element[1] < i + batch_size).map(lambda element: element[0]).collect())
for l in batches:
print(l)输出:
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
['10', '11', '12', '13', '14', '15', '16', '17', '18', '19']
['20', '21', '22', '23', '24', '25', '26', '27', '28', '29']
['30', '31', '32', '33', '34', '35', '36', '37', '38', '39']
['40', '41', '42', '43', '44', '45', '46', '47', '48', '49']
['50', '51', '52', '53', '54', '55', '56', '57', '58', '59']
['60', '61', '62', '63', '64', '65', '66', '67', '68', '69']
['70', '71', '72', '73', '74', '75', '76', '77', '78', '79']
['80', '81', '82', '83', '84', '85', '86', '87', '88', '89']
['90', '91', '92', '93', '94', '95', '96', '97', '98', '99']https://stackoverflow.com/questions/69538918
复制相似问题