首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >大型查找表在单台MultiCore机上的PySpark应用

大型查找表在单台MultiCore机上的PySpark应用
EN

Stack Overflow用户
提问于 2015-10-06 22:34:23
回答 1查看 798关注 0票数 1

我有一个大型查找表,它将整数作为键,字符串列表作为值。我需要这个查找表来做一些过滤和转换数据,我通过火花加载。

代码语言:javascript
复制
import numpy as np
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf()
conf.setMaster("local[20]")
conf.setAppName("analysis")
conf.set("spark.local.dir", "./tmp")
#conf.set("spark.executor.memory", "20g")
#conf.set("spark.driver.memory", "20g")
conf.set("spark.python.worker.reuse", "yes")

sc = SparkContext(conf=conf)
sqlc = SQLContext(sc)

在启动火花放电时,我甚至使用了--driver-memory 20g选项。

我的机器有500 GB内存和27个核心。我首先在内存中加载一个名为lookup_tbl的字典,它有17457954行。

当我试图运行下面的代码时,任何输出都不会超过10分钟。在等了这么久之后,我关闭了这个过程。我需要查表功能。我甚至尝试过使用broadcast特性。

代码语言:javascript
复制
sc.broadcast(lookup_tbl)
def clean_data(x, transform=lambda k: (int(k[0]), "\t".join(k[1:]))):
  x = x.split('\t')
  return transform(x)


def check_self(x):
  from_id = x[0]
  to_id = x[1]
  self_ = 1
  try:
    common_items = set(lookup_tbl[from_id]).intersection(set(lookup_tbl[to_id]))
  except KeyError:
    common_items = set()
  if len(common_items ) < 1:
    common_items = set("-")
    self_ = 0
  return (((from_id, to_id, k, self_) for k in common_items ))

pair = sc.textFile("data/pair.tsv").map(lambda x: clean_data(x, transform=lambda k: (int(k[0]), int(k[1])))).flatMap(check_self)
csv_data = pair.map(lambda x: "\t".join("%s" for k in xrange(len(x))) % x)
csv_data.saveAsTextFile("out/pair_transformed")

这是火花的问题,还是我没有正确运行?另外,我也尝试过为executor和driver (~20g)设置各种值,但是没有得到任何改进。

据我所知,spark首先尝试序列化此字典,然后再将其发送到所有本地进程。有什么方法可以从普通的位置使用这本词典吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-10-07 07:39:06

首先,要访问广播变量,必须使用它的value属性:

代码语言:javascript
复制
# You can use get instead of catching KeyError
s1 = set(lookup_tbl.value.get(from_id, set()))
s2 = set(lookup_tbl.value.get(to_id, set()))
common_items = s1.intersection(s2)

为了避免广播,您可以在lookup_tbl中本地加载mapPartitions

代码语言:javascript
复制
def check_partition(iter):
   lookup_tbl = ...
   for x in iter:
       yield check_self

identity = lambda x: x
pair = (sc.textFile(...)
    .map(lambda x: clean_data(...)
    .mapPartitions(check_partition)
    .flatMap(identity))

如果lookup_tbl是相对较大的,那么它仍然可以是相当大的expensive.There,有相当多的方法可以处理这个问题:

  1. 使用SQLite连接而不是局部变量。 导入sqlite3 conn =sqlite3.Connection(‘path/to/lookup.db’)c.execute(“从查找中选择键,其中id = '%s'”% from_id) s1 = {x for x in c.fetchall()} c.execute(“从查找中选择键,其中id = '%s'”% to_id) s2 = {x for x in c.fetchall()} common_items =s1交叉(S2) 它很容易设置,如果数据被正确地索引,它应该足够快。
  2. 使用单个数据库服务器进行查找。MongoDB应该工作得很好,通过适当的内存映射,您可以显着地减少内存占用。
  3. 使用join代替广播 交换= lambda:(x1,x) def reshape1( record ):(k1,(items,k2)) =记录返回(k2,(k1,items)) def reshape2(记录):(k1,(items1,(k2,items2)=记录返回(k1,k2,set(items1) & set(items2))对=sc.textFile(.).map(lambda x: sc.textFile(.)N=.#分区数lookup_rdd = lookup_rdd.join(lookup_rdd.join(pairs).map(reshape1)).map(reshape2)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32980908

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档