我是DataBricks的新手。我的任务是读取大量大型CSV文件(大小高达1G),并验证和清理所有准备好进行polybase读取到Azure DW中的字段。这些文件存储在blob中。
我认为DatBricks和Python是一种能够产生合理性能的方法。
我使用如下所示的示例QuickStart作为起点:https://docs.microsoft.com/en-us/azure/azure-databricks/quickstart-create-databricks-workspace-portal
我想在每个字段上执行一些清理替换,并运行正则表达式,以过滤掉任何其他不需要的字符,最后修剪以删除尾随空格。我已经在下面包含了一个测试示例片段,它给出了我希望执行的验证类型的风格。此示例使用udf来转换值,然后使用正则表达式来过滤不需要的字符,如链接中所示。
import pyspark.sql.functions as f
def udf_clean (s):
return (f.translate(s,'3','B'))
df.filter(df.category=='Housing').select(df[1],f.trim(f.regexp_replace(udf_clean(df[1]),'(\d+)',''))).show()我找不到的是如何在整个数据帧上执行这些转换。我想一次性清理整个数据帧。因为它是基于向量的,所以我觉得我不应该一次一行地迭代它,而是在整体上执行某种类型的操作。我知道如何迭代行,如下所示
`for row in df.rdd.collect():
do_something(row)`..but我觉得我应该能够对整个字段集做一些更有效率的事情。这个想法是正确的吗?有没有人举个例子?非常感谢,Richard
结果代码,但不是答案
我还没有找到这个问题的答案,但我想我会发布我的代码,正如你将看到的,它并不优雅,但工作正常。
from pyspark.sql import functions as f
from pyspark.sql.functions import regexp_replace, udffrom pyspark.sql.functions import translate, udf
from pyspark.sql.functions import trim, udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
def udf_regexclean(s):
return trim(regexp_replace(s,'([^\p{L}\p{Nd} ''@.():_*\-&+\/,])',''))
def udf_regexReplace(s):
return regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(s,'£','GBR'),'’',''),' ',''),"'t",''),'É', 'E')
df1=df.select( udf_regexclean(udf_regexReplace(df[0 ]))
,udf_regexclean(udf_regexReplace(df[1 ]))
,udf_regexclean(udf_regexReplace(df[2 ]))
,udf_regexclean(udf_regexReplace(df[3 ]))
,udf_regexclean(udf_regexReplace(df[4 ]))
,udf_regexclean(udf_regexReplace(df[5 ]))
,udf_regexclean(udf_regexReplace(df[6 ]))
,udf_regexclean(udf_regexReplace(df[7 ]))
,udf_regexclean(udf_regexReplace(df[8 ]))
,udf_regexclean(udf_regexReplace(df[9 ]))
,udf_regexclean(udf_regexReplace(df[10 ]))
,udf_regexclean(udf_regexReplace(df[11 ]))
,udf_regexclean(udf_regexReplace(df[12 ]))
,udf_regexclean(udf_regexReplace(df[13 ]))
,udf_regexclean(udf_regexReplace(df[14 ]))
,udf_regexclean(udf_regexReplace(df[15 ]))
,udf_regexclean(udf_regexReplace(df[16 ]))
,udf_regexclean(udf_regexReplace(df[17 ]))
,udf_regexclean(udf_regexReplace(df[18 ]))
,udf_regexclean(udf_regexReplace(df[19 ]))
,udf_regexclean(udf_regexReplace(df[20 ]))
,udf_regexclean(udf_regexReplace(df[21 ]))
,udf_regexclean(udf_regexReplace(df[22 ]))
,udf_regexclean(udf_regexReplace(df[23 ]))
,udf_regexclean(udf_regexReplace(df[24 ]))
,udf_regexclean(udf_regexReplace(df[25 ]))
,udf_regexclean(udf_regexReplace(df[26 ]))
,udf_regexclean(udf_regexReplace(df[27 ]))
,udf_regexclean(udf_regexReplace(df[28 ]))
,udf_regexclean(udf_regexReplace(df[29 ]))
,udf_regexclean(udf_regexReplace(df[30 ]))
,udf_regexclean(udf_regexReplace(df[31 ]))
,udf_regexclean(udf_regexReplace(df[32 ]))
)
df2=df1.withColumn('ScrapedFilename',lit(blob_filename))理查德
发布于 2020-01-20 17:43:51
作为示例,我创建了一个简单的示例来实现您的需求,而不是替换,只是通过在旧数据帧的RDD上应用UDF函数来创建新的数据帧。
首先,我创建了一个简单的数据帧,如下面的代码和图所示。
import numpy as np
import pandas as pd
dates = pd.date_range('20130101', periods=6)
df = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
sparkDF=spark.createDataFrame(df)
display(sparkDF)

然后,为每行定义一个函数,如下面的代码和图所示。
def udf_clean(row):
return (row[0] > 0 and True or False, row[1]+2, row[2]*2, row[3]**4)
new_rdd = sparkDF.rdd.map(lambda row: udf_clean(row))
new_sparkDF = spark.createDataFrame(new_rdd, list('ABCD'))
display(new_sparkDF)

希望能有所帮助。
https://stackoverflow.com/questions/59795082
复制相似问题