我有一个像这样的数据集,它是一个多行和多分隔符。我正在使用spark 2.3来阅读相同的内容。
我想转换成具有不同分隔符的单行文件。
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
12223212|*|1212|*|0|*|0|*||*|ABD
[c0re] score
12-- 12--P|*|1234|*|asda|##|
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
2334343|*|1212|*|0|*|0|*||*|ABD
[c0re] score
12-- 12--P|*|1223|*|asda|##|
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|预期输出
1223232~1212~0~0~~ABDP~1234~asda
12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda
1223232~1212~0~0~~ABDP~1234~asda
2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda
1223232~1212~0~0~~ABDP~1234~asda此文件最初是使用UNIX sed命令进行转换的。但是,文件的大小增长到50 of,服务器被挂起。
while read myline
do
sed -i 's/\r//g' $myline
sed -i ':a;N;$!ba;s/\n//g' $myline # removes new line
sed -i 's+|\#\#|+\n+g' $myline #replaces all |##| as new line
sed -i 's/~/-/g' $myline
sed -i 's/\\/ /g' $myline
sed -i 's+|\*|+~+g' $myline # converts all |*| as ~
sed -i 's+|\#\#|++g' $myline
done < filename这必须用多字符分隔符和数据中的换行符重写为spark,它无法解决问题。
我有火花代码写成,但不确定如何处理|#|作为换行符或eod,而不是换行符和reaplce换行符可能是
df = sc.textFile(source_filename).map(lambda x: x.split("|*|")).toDF(header_column)发布于 2021-05-18 11:25:18
Spark的split中的分隔符实际上是一个正则表达式,因此您需要转义管道|和星号*符号。
from pyspark.sql import functions as F
(spark
.read.text('a.txt')
.withColumn('tmp', F.explode(F.split(F.col('value'), '\|##\|')))
.withColumn('tmp', F.regexp_replace(F.col('tmp'), '\|\*\|', '~'))
.select('tmp')
.show(10, False)
)
# EDIT: since the data is **not** single line, hence the updated code below.
# I created "fake ID" to group records together based on its indexes
(spark
.read.text('a.txt')
.withColumn('id', F.monotonically_increasing_id())
.withColumn('grp', (F.col('id') / 4).cast('int'))
.groupBy('grp')
.agg(F.collect_list('value').alias('value'))
.withColumn('value', F.concat_ws('', 'value'))
.withColumn('value', F.explode(F.split(F.col('value'), '\|##\|')))
.withColumn('value', F.regexp_replace(F.col('value'), '\|\*\|', '~'))
.where(F.col('value') != '')
.drop('grp')
.show(10, False)
)
# +-------------------------------------------------------+
# |tmp |
# +-------------------------------------------------------+
# |1223232~1212~0~0~~ABDP~1234~asda |
# |12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda|
# |1223232~1212~0~0~~ABDP~1234~asda |
# |2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda |
# |1223232~1212~0~0~~ABDP~1234~asda |
# | |
# +-------------------------------------------------------+发布于 2021-05-21 19:15:05
我正在设法按文件分区执行您的数据,但这也需要一个大的集群,因为您提到过它有超过30 by的文件。
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df = spark.read.csv('your_path', schema='value string')
df = df.withColumn('filename', f.input_file_name())
df = df.repartition('filename')
df = df.withColumn('index', f.monotonically_increasing_id())
w = Window.partitionBy('filename').orderBy('index')
df = df.withColumn('group', f.sum(f.lag('value', default=False).over(w).endswith(f.lit('|##|')).cast('int')).over(w))
df = df.withColumn('value', f.regexp_replace('value', '\|\*\|', '~'))
df = df.withColumn('value', f.regexp_replace('value', '\|##\|', ''))
df = df.groupBy('group').agg(f.concat_ws('', f.collect_list('value')).alias('value'))
(df
.select('value')
.sort('group')
.show(truncate=False))输出
+-------------------------------------------------------+
|value |
+-------------------------------------------------------+
|1223232~1212~0~0~~ABDP~1234~asda |
|12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda|
|1223232~1212~0~0~~ABDP~1234~asda |
|2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda |
|1223232~1212~0~0~~ABDP~1234~asda |
+-------------------------------------------------------+https://stackoverflow.com/questions/67579000
复制相似问题