我正在编写PySpark代码,其中我有10个查找表,对于每个查找表,我定义了一个结构,然后定义了一个模式。然后,我为每个查找表创建一个DF,并在最后使用它们与一个主表连接。我知道如何编码,但是有人能指导我如何构造代码吗?我是Python新手,所以不知道如何用PySpark组织我的代码。也许可以和我分享一些样例产品PySpark代码?谢谢!
发布于 2020-08-27 22:37:19
为了恢复和管理您的代码,您可以在不同的类中定义不同的部分。我的方法是创建一个ini或yaml作为参考。此外,您还可以在测试和生产环境中处理输入变量。
例如:
主类:
from sparkSchema import SparkSchema
from configparser import ConfigParser
if __name__ == "__main__":
config_path = './config.ini'
config = ConfigParser()
config.optionxform = str
config.read(config_path)
#initialize your app, create session and context, etc.
#also you can handle this part with some class.
streamDFWithSchema = SparkSchema(streamDF, config, 'Schema').getDFWithSchema()
# rest of the code最好将每个输入变量(例如SparkSchema类中的模式)定义为一个输入变量,并且可以使用argpars库。
SparkSchema类:
from pyspark.sql.functions import to_timestamp
class SparkSchema:
def __init__(self, DF, config, section):
self.DF =DF
self.config = config
self.section = section
def getDFWithSchema(self):
self.DF = self.DF \
.selectExpr(
'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
+self.config[self.section]['grouped.column.index']+
'] AS STRING) as '+self.config[self.section]['grouped.column.name'] \
,'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
+self.config[self.section]['date.column.index']+
'] AS STRING) as '+self.config[self.section]['date.column.name'])
self.DF = self.DF\
.withColumn('EventDate',
to_timestamp(self.config[self.section]['date.column.name']
, self.config[self.section]['date.column.format']))
self.DF.printSchema()
return self.DF.ini文件:
.
.
.
[Schema]
message.delimiter=\\\|
grouped.column.name=EmployeeId
grouped.column.index=61
date.column.name=END_DATE
date.column.index=12
date.column.format=yyyyMMddHHmmss
[SchemaLocal]
message.delimiter=\\\|
grouped.column.name=EmployeeId
grouped.column.index=2
date.column.name=END_DATE
date.column.index=3
date.column.format=yyyy-MM-dd HHmmss
.
.
.并且您应该在您的spark-submit命令中将库configparser作为--py-files添加:
$ spark-submit --jars some.jar,jar.file\
--py-files configparser.zip,argpars.zip\
main_class.pyhttps://stackoverflow.com/questions/40029144
复制相似问题