首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark代码结构

PySpark代码结构
EN

Stack Overflow用户
提问于 2016-10-14 03:14:41
回答 1查看 929关注 0票数 1

我正在编写PySpark代码,其中我有10个查找表,对于每个查找表,我定义了一个结构,然后定义了一个模式。然后,我为每个查找表创建一个DF,并在最后使用它们与一个主表连接。我知道如何编码,但是有人能指导我如何构造代码吗?我是Python新手,所以不知道如何用PySpark组织我的代码。也许可以和我分享一些样例产品PySpark代码?谢谢!

EN

回答 1

Stack Overflow用户

发布于 2020-08-27 22:37:19

为了恢复和管理您的代码,您可以在不同的类中定义不同的部分。我的方法是创建一个iniyaml作为参考。此外,您还可以在测试和生产环境中处理输入变量。

例如:

主类:

代码语言:javascript
复制
from sparkSchema import SparkSchema
from configparser import ConfigParser 

if __name__ == "__main__":

    config_path = './config.ini'
    config = ConfigParser()
    config.optionxform = str
    config.read(config_path)

#initialize your app, create session and context, etc. 
#also you can handle this part with some class.

streamDFWithSchema  = SparkSchema(streamDF, config, 'Schema').getDFWithSchema()

# rest of the code

最好将每个输入变量(例如SparkSchema类中的模式)定义为一个输入变量,并且可以使用argpars库。

SparkSchema类:

代码语言:javascript
复制
from pyspark.sql.functions import to_timestamp

class SparkSchema:
    
    def __init__(self, DF, config, section):
        self.DF =DF
        self.config = config
        self.section = section


    def getDFWithSchema(self):
        
        self.DF  = self.DF \
      .selectExpr(
   'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
   +self.config[self.section]['grouped.column.index']+
   '] AS STRING) as '+self.config[self.section]['grouped.column.name'] \
   ,'CAST(split(KafkaValue,"'+self.config[self.section]['message.delimiter']+'")['
   +self.config[self.section]['date.column.index']+
   '] AS STRING) as '+self.config[self.section]['date.column.name'])

        self.DF = self.DF\
            .withColumn('EventDate',
            to_timestamp(self.config[self.section]['date.column.name']
            , self.config[self.section]['date.column.format']))
        self.DF.printSchema()
        return self.DF

.ini文件:

代码语言:javascript
复制
.
.
.
[Schema]
message.delimiter=\\\|
grouped.column.name=EmployeeId
grouped.column.index=61
date.column.name=END_DATE
date.column.index=12
date.column.format=yyyyMMddHHmmss
[SchemaLocal]
message.delimiter=\\\|
grouped.column.name=EmployeeId
grouped.column.index=2
date.column.name=END_DATE
date.column.index=3
date.column.format=yyyy-MM-dd HHmmss
.
.
.

并且您应该在您的spark-submit命令中将库configparser作为--py-files添加:

代码语言:javascript
复制
$ spark-submit --jars some.jar,jar.file\
  --py-files configparser.zip,argpars.zip\
  main_class.py
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40029144

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档