我试图运行一个语义处理的Spark程序,但是它停留在第二阶段。我想知道这里有什么问题吗?
# create Spark Context
spark = SparkSession.builder.master("Semantic Processing")\
.config('spark.master', 'local')\
.getOrCreate()
sqlc = SQLContext(spark)
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
# read in csv
df = sqlc.read.format('csv')\
.options(header='true', inferSchema='true', sep='|')\
.load('data/VzW_PSSData_07012015.csv')
# udf for extracting triples
triples_schema = StructType([
StructField('subjects', ArrayType(StringType(), True), True),
StructField('actions', ArrayType(StringType(), True), True),
StructField('objects', ArrayType(StringType(), True), True)
])
triples_udf = udf(lambda x: get_triples(x), triples_schema)
start = time.time()
triples = df.select('InteractionID', triples_udf('Notes').alias('Triples'))
triples = triples.withColumn('Subject', explode('Triples.subjects'))\
.withColumn('Action', explode('Triples.actions'))\
.withColumn('Object', explode('Triples.objects'))\
.select('InteractionID', 'Subject', 'Action', 'Object')
triples.explain()
# write to csv
triples.coalesce(1).write.format('csv').save('SPARK_triples_output.csv', header='true', sep='|')这就是get_triples()所做的:
def get_triples(note):
"""extract subject, verb, and object"""
nlp = spacy.load("en_core_web_sm")
if note == None:
return None
else:
doc = nlp(str(note))
subj_list = []
verb_list = []
obj_list = []
for sent in doc.sents:
sent = clean_text(str(sent))
doc = nlp(sent)
# find each verb
for tok in doc:
subj = ""
verb = ""
obj = ""
mod = ""
prep = ""
# check if current token is a verb
if tok.pos == VERB:
verb = tok.text
# store right tree into object
obj = ' '.join([t.text for t in tok.rights])
# for each verb, check children for objects: dobj and prep
# find pred, modifiers, negs, adv for verb
for child in tok.children:
if child.dep_ == 'nsubj':
subj = child.text
elif (child.dep_ == 'aux'):
mod += child.text + ' '
elif (child.dep_ == 'neg'):
mod += child.text + ' '
elif (child.dep_ == 'prep'):
prep = child.text
verb = mod + verb + ' ' + prep
subj_list.append(subj)
verb_list.append(verb)
obj_list.append(obj)
return subj_list, verb_list, obj_list它读取文本记录,查找主题、动词和对象,并返回三个列表:主题、动词、对象。我的数据集大约为460 k记录,为247.7MB。
示例:
Input:
'customer called in needing help with activating their replacement phone did an esn swap in acss customer needing to pay their bill once bill was payed activation went through did a test call and sent out a test text issue resolved '我想要输出到一个dataframe中,然后写到如下所示的CSV:
InteractionID Subject Action Object
0 42671331 customer said lagging
0 42671331 has
0 42671331 been
0 42671331 device has been lagging freezing turned
0 42671331 moving slower这是printSchema的输出
root
|-- InteractionID: integer (nullable = true)
|-- Subject: string (nullable = true)
|-- Action: string (nullable = true)
|-- Object: string (nullable = true)基本上,我的计划是:
在CSV file
InteractionID并在列上运行UDF,在根列TriplesInteractionID内的三个数组列,以及三个爆发columns。
这是explain的输出
== Physical Plan ==
Generate explode(Triples#131.objects), [InteractionID#17, Subject#136, Action#142], false, [Object#149]
+- Generate explode(Triples#131.actions), [InteractionID#17, Triples#131, Subject#136], false, [Action#142]
+- Generate explode(Triples#131.subjects), [InteractionID#17, Triples#131], false, [Subject#136]
+- *(2) Project [InteractionID#17, pythonUDF0#159 AS Triples#131]
+- BatchEvalPython [<lambda>(Notes#65)], [InteractionID#17, Notes#65, pythonUDF0#159]
+- *(1) FileScan csv [InteractionID#17,Notes#65] Batched: false, Format: CSV, Location: InMemory发布于 2019-12-18 16:04:50
对.coalesce(1)的调用很可能是造成问题的原因,但是在您试图做的事情中还有其他的问题来源。
合并::,我假设您的目标是获得一个文件CSV。馊主意。火花是关于并行计算的。使用coalesce(1),您可以将所有数据放在一个分区(一台核心,一台机器)中,这就是为什么要花费这么长的时间。你甚至可能得到一个“内存不足的错误”。
2.爆炸性:您的工作是将数据集的大小乘以很多。每一行都生成A*B*C lines,其中A是主题的数量,B是对象的数量,C是操作的数量。一个分区的大量数据。如果每个数组计算5个元素,则最终将数据集的大小乘以5^3=125,这意味着数据集的大小为34 by。就记录而言,460K*125=57.5M。对于一台笔记本电脑来说,这可能是个大问题。
3. python:除了使用UDF外,pyspark和scala之间的性能差距通常很小。事实上,与那些知道如何优化的SparkSQL函数相比,即使在scala中,UDF也会降低性能。当UDF在scala中时,它会慢一些。当它在python中时,它的catastrophically会慢一些。
但这些只是指点。你是说它没有进展。在读取CSV之后尝试调用repartition(1000),并删除coalesce(1)。它将把作业分成更小的部分(您甚至可以尝试大于1000的值),如果某些任务正在完成,则使用SparkUI查看。
发布于 2019-12-18 19:11:55
看起来你所产生的数据比你想要的要多。用于数据
1 dog eats meat
1 bat hits ball当你收集清单和爆炸,你会得到
1 dog eats meat
1 dog eats ball
1 dog hits meat
1 dog hits ball
1 bat eats meat
1 bat eats ball
1 bat hits meat
1 bat hits ball当你的列表中有10个项目时,你可以看到会发生什么。对于您的情况,返回一个3元组列表比3个列表更合理。
https://stackoverflow.com/questions/59395496
复制相似问题