首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用pyspark合并MultiLine记录

使用pyspark合并MultiLine记录
EN

Stack Overflow用户
提问于 2021-05-18 09:58:14
回答 2查看 61关注 0票数 0

我有一个像这样的数据集,它是一个多行和多分隔符。我正在使用spark 2.3来阅读相同的内容。

我想转换成具有不同分隔符的单行文件。

代码语言:javascript
复制
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
12223212|*|1212|*|0|*|0|*||*|ABD
[c0re] score 
12-- 12--P|*|1234|*|asda|##|
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
2334343|*|1212|*|0|*|0|*||*|ABD
[c0re] score 
12-- 12--P|*|1223|*|asda|##|
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|

预期输出

代码语言:javascript
复制
1223232~1212~0~0~~ABDP~1234~asda
12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda
1223232~1212~0~0~~ABDP~1234~asda
2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda
1223232~1212~0~0~~ABDP~1234~asda

此文件最初是使用UNIX sed命令进行转换的。但是,文件的大小增长到50 of,服务器被挂起。

代码语言:javascript
复制
while read myline
do
sed -i 's/\r//g' $myline
sed -i ':a;N;$!ba;s/\n//g' $myline # removes new line
sed -i 's+|\#\#|+\n+g' $myline #replaces all |##| as new line
sed -i 's/~/-/g' $myline 
sed -i 's/\\/  /g' $myline
sed -i 's+|\*|+~+g' $myline # converts all |*| as ~
sed -i 's+|\#\#|++g' $myline
done < filename

这必须用多字符分隔符和数据中的换行符重写为spark,它无法解决问题。

我有火花代码写成,但不确定如何处理|#|作为换行符或eod,而不是换行符和reaplce换行符可能是

代码语言:javascript
复制
df = sc.textFile(source_filename).map(lambda x: x.split("|*|")).toDF(header_column)
EN

回答 2

Stack Overflow用户

发布于 2021-05-18 11:25:18

Spark的split中的分隔符实际上是一个正则表达式,因此您需要转义管道|和星号*符号。

代码语言:javascript
复制
from pyspark.sql import functions as F

(spark
    .read.text('a.txt')
    .withColumn('tmp', F.explode(F.split(F.col('value'), '\|##\|')))
    .withColumn('tmp', F.regexp_replace(F.col('tmp'), '\|\*\|', '~'))
    .select('tmp')
    .show(10, False)
)

# EDIT: since the data is **not** single line, hence the updated code below.
# I created "fake ID" to group records together based on its indexes
(spark
    .read.text('a.txt')
    .withColumn('id', F.monotonically_increasing_id())
    .withColumn('grp', (F.col('id') / 4).cast('int'))
    .groupBy('grp')
    .agg(F.collect_list('value').alias('value'))
    .withColumn('value', F.concat_ws('', 'value'))
    .withColumn('value', F.explode(F.split(F.col('value'), '\|##\|')))
    .withColumn('value', F.regexp_replace(F.col('value'), '\|\*\|', '~'))
    .where(F.col('value') != '')
    .drop('grp')
    .show(10, False)
)

# +-------------------------------------------------------+
# |tmp                                                    |
# +-------------------------------------------------------+
# |1223232~1212~0~0~~ABDP~1234~asda                       |
# |12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda|
# |1223232~1212~0~0~~ABDP~1234~asda                       |
# |2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda |
# |1223232~1212~0~0~~ABDP~1234~asda                       |
# |                                                       |
# +-------------------------------------------------------+
票数 0
EN

Stack Overflow用户

发布于 2021-05-21 19:15:05

我正在设法按文件分区执行您的数据,但这也需要一个大的集群,因为您提到过它有超过30 by的文件。

代码语言:javascript
复制
from pyspark.sql.window import Window
import pyspark.sql.functions as f


df = spark.read.csv('your_path', schema='value string')
df = df.withColumn('filename', f.input_file_name())
df = df.repartition('filename')

df = df.withColumn('index', f.monotonically_increasing_id())

w = Window.partitionBy('filename').orderBy('index')
df = df.withColumn('group', f.sum(f.lag('value', default=False).over(w).endswith(f.lit('|##|')).cast('int')).over(w))

df = df.withColumn('value', f.regexp_replace('value', '\|\*\|', '~'))
df = df.withColumn('value', f.regexp_replace('value', '\|##\|', ''))

df = df.groupBy('group').agg(f.concat_ws('', f.collect_list('value')).alias('value'))
(df
 .select('value')
 .sort('group')
 .show(truncate=False))

输出

代码语言:javascript
复制
+-------------------------------------------------------+
|value                                                  |
+-------------------------------------------------------+
|1223232~1212~0~0~~ABDP~1234~asda                       |
|12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda|
|1223232~1212~0~0~~ABDP~1234~asda                       |
|2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda |
|1223232~1212~0~0~~ABDP~1234~asda                       |
+-------------------------------------------------------+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67579000

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档