首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark任务需要很长时间才能运行三个爆炸性函数

PySpark任务需要很长时间才能运行三个爆炸性函数
EN

Stack Overflow用户
提问于 2019-12-18 15:41:59
回答 2查看 936关注 0票数 3

我试图运行一个语义处理的Spark程序,但是它停留在第二阶段。我想知道这里有什么问题吗?

代码语言:javascript
复制
# create Spark Context
spark = SparkSession.builder.master("Semantic Processing")\
        .config('spark.master', 'local')\
        .getOrCreate()
sqlc = SQLContext(spark)
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

# read in csv
df = sqlc.read.format('csv')\
    .options(header='true', inferSchema='true', sep='|')\
    .load('data/VzW_PSSData_07012015.csv')

# udf for extracting triples

triples_schema = StructType([
    StructField('subjects', ArrayType(StringType(), True), True),
    StructField('actions', ArrayType(StringType(), True), True),
    StructField('objects', ArrayType(StringType(), True), True)
])

triples_udf = udf(lambda x: get_triples(x), triples_schema)

start = time.time()

triples = df.select('InteractionID', triples_udf('Notes').alias('Triples'))
triples = triples.withColumn('Subject', explode('Triples.subjects'))\
                    .withColumn('Action', explode('Triples.actions'))\
                    .withColumn('Object', explode('Triples.objects'))\
                    .select('InteractionID', 'Subject', 'Action', 'Object')

triples.explain()

# write to csv
triples.coalesce(1).write.format('csv').save('SPARK_triples_output.csv', header='true', sep='|')

这就是get_triples()所做的:

代码语言:javascript
复制
def get_triples(note):
    """extract subject, verb, and object"""
    nlp = spacy.load("en_core_web_sm")

    if note == None:
        return None
    else:
        doc = nlp(str(note))
        subj_list = []
        verb_list = []
        obj_list = []
        for sent in doc.sents:
            sent = clean_text(str(sent))
            doc = nlp(sent)
            # find each verb
            for tok in doc:
                subj = ""
                verb = ""
                obj = ""
                mod = ""
                prep = ""
                # check if current token is a verb
                if tok.pos == VERB:
                    verb = tok.text

                    # store right tree into object
                    obj = ' '.join([t.text for t in tok.rights])

                    # for each verb, check children for objects: dobj and prep
                    # find pred, modifiers, negs, adv for verb
                    for child in tok.children:
                        if child.dep_ == 'nsubj':
                            subj = child.text

                        elif (child.dep_ == 'aux'):
                            mod += child.text + ' '

                        elif (child.dep_ == 'neg'):
                            mod += child.text + ' '

                        elif (child.dep_ == 'prep'):
                            prep = child.text

                    verb = mod + verb + ' ' + prep

                    subj_list.append(subj)
                    verb_list.append(verb)
                    obj_list.append(obj)

        return subj_list, verb_list, obj_list

它读取文本记录,查找主题、动词和对象,并返回三个列表:主题、动词、对象。我的数据集大约为460 k记录,为247.7MB。

示例:

代码语言:javascript
复制
Input:
'customer called in needing help with activating their  replacement phone did an esn swap  in acss customer needing to pay their bill once bill was payed activation went through did a test call and sent out a test text issue resolved '

我想要输出到一个dataframe中,然后写到如下所示的CSV:

代码语言:javascript
复制
InteractionID   Subject     Action              Object
0   42671331    customer    said                lagging
0   42671331                has 
0   42671331                been    
0   42671331    device      has been lagging    freezing turned
0   42671331                moving              slower

这是printSchema的输出

代码语言:javascript
复制
root                                                                            
 |-- InteractionID: integer (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Action: string (nullable = true)
 |-- Object: string (nullable = true)

基本上,我的计划是:

在CSV file

  • declaring中读取
  1. ,一个UDF语义处理string
  2. Creating,选择InteractionID并在列上运行UDF,在根列Triples
  3. Selecting InteractionID内的三个数组列,以及三个爆发columns
  4. Writing到一个新的CSV

这是explain的输出

代码语言:javascript
复制
== Physical Plan ==                                                             
Generate explode(Triples#131.objects), [InteractionID#17, Subject#136, Action#142], false, [Object#149]
+- Generate explode(Triples#131.actions), [InteractionID#17, Triples#131, Subject#136], false, [Action#142]
   +- Generate explode(Triples#131.subjects), [InteractionID#17, Triples#131], false, [Subject#136]
      +- *(2) Project [InteractionID#17, pythonUDF0#159 AS Triples#131]
         +- BatchEvalPython [<lambda>(Notes#65)], [InteractionID#17, Notes#65, pythonUDF0#159]
            +- *(1) FileScan csv [InteractionID#17,Notes#65] Batched: false, Format: CSV, Location: InMemory
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-12-18 16:04:50

.coalesce(1)的调用很可能是造成问题的原因,但是在您试图做的事情中还有其他的问题来源。

合并:,我假设您的目标是获得一个文件CSV。馊主意。火花是关于并行计算的。使用coalesce(1),您可以将所有数据放在一个分区(一台核心,一台机器)中,这就是为什么要花费这么长的时间。你甚至可能得到一个“内存不足的错误”。

2.爆炸性:您的工作是将数据集的大小乘以很多。每一行都生成A*B*C lines,其中A是主题的数量,B是对象的数量,C是操作的数量。一个分区的大量数据。如果每个数组计算5个元素,则最终将数据集的大小乘以5^3=125,这意味着数据集的大小为34 by。就记录而言,460K*125=57.5M。对于一台笔记本电脑来说,这可能是个大问题。

3. python:除了使用UDF外,pyspark和scala之间的性能差距通常很小。事实上,与那些知道如何优化的SparkSQL函数相比,即使在scala中,UDF也会降低性能。当UDF在scala中时,它会慢一些。当它在python中时,它的catastrophically会慢一些。

但这些只是指点。你是说它没有进展。在读取CSV之后尝试调用repartition(1000),并删除coalesce(1)。它将把作业分成更小的部分(您甚至可以尝试大于1000的值),如果某些任务正在完成,则使用SparkUI查看。

票数 4
EN

Stack Overflow用户

发布于 2019-12-18 19:11:55

看起来你所产生的数据比你想要的要多。用于数据

代码语言:javascript
复制
1 dog eats meat
1 bat hits ball

当你收集清单和爆炸,你会得到

代码语言:javascript
复制
1 dog eats meat
1 dog eats ball
1 dog hits meat
1 dog hits ball
1 bat eats meat
1 bat eats ball
1 bat hits meat
1 bat hits ball

当你的列表中有10个项目时,你可以看到会发生什么。对于您的情况,返回一个3元组列表比3个列表更合理。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59395496

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档