首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark SCD 1型

PySpark SCD 1型
EN

Code Review用户
提问于 2021-04-27 15:35:44
回答 2查看 594关注 0票数 3

我在Azure DataBricks中使用DataBricks来尝试创建SCD 1。

我想知道这是否一种有效的做法?

下面是我的SQL表:

代码语言:javascript
复制
CREATE TABLE dbo.Countries (
  CountryId bigint IDENTITY,
  ShortName nvarchar(max) NULL,
  FullName nvarchar(max) NULL,
  CapitalCities nvarchar(max) NOT NULL,
  IsActive bit NOT NULL DEFAULT (0),
  CreatedOn datetime2 NOT NULL DEFAULT (getdate()),
  UpdatedOn datetime2 NULL,
  PRIMARY KEY CLUSTERED (CountryId)
)

这里是我的Python:

代码语言:javascript
复制
# Import Python Modules
import requests
import json
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.sql.functions import md5, concat_ws

# Get Random Data From API
url = 'https://raw.githubusercontent.com/mledoze/countries/master/countries.json'
response = requests.get(url)
rdd_countries = sc.parallelize([response.text])
df_countries = spark.read.option("multiline","true") \
                         .json(rdd_countries)

# Remove Unwanted Columns
df_countries = df_countries.select(F.col("name.common").alias("ShortName"), F.col("name.official").alias("FullName"), F.col("capital").alias("CapitalCities"))

# Transform CapitalCities From Map To String
df_countries = df_countries.withColumn("CapitalCities", F.concat_ws(",", F.col("CapitalCities")))

# Add MD5 Hash For Row Comparison
df_countries = df_countries.withColumn("ActiveRowHash", md5(concat_ws('|',df_countries.ShortName,df_countries.FullName,df_countries.CapitalCities)))

# Rename All Source Columns
df_countries = df_countries.select([F.col(c).alias("Source_" + c) for c in df_countries.columns])

# Show Source Data
df_countries.show()

# SQL Server Config
target_server = "x.database.windows.net"
target_database = "x"
target_username = "x"
target_password = "x"
target_table = "dbo.Countries"

# Get Target Data From SQL
df_target = spark.read.format("jdbc") \
                  .option("url", f"jdbc:sqlserver://{target_server};databaseName={target_database};") \
                  .option("dbtable", target_table) \
                  .option("user", target_username) \
                  .option("password", target_password) \
                  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                  .load()

# Add MD5 Hash For Row Comparison
df_target = df_target.withColumn("CurrentRowHash", md5(concat_ws('|',df_target.ShortName,df_target.FullName,df_target.CapitalCities)))

# Rename All Target Columns
df_target = df_target.select([F.col(c).alias("Target_" + c) for c in df_target.columns])

# Show Data
df_target.show()

# DataFrame For New Rows
df_new = df_countries.join(df_target, df_countries.Source_ShortName == df_target.Target_ShortName, "leftanti")
df_new.show()

# DataFrame For Deleted Rows
df_deleted = df_target.join(df_countries, df_target.Target_ShortName == df_countries.Source_ShortName, "leftanti")
df_deleted.show()

# DataFrame For Updated Rows
df_updated = df_countries.join(df_target, (df_countries.Source_ShortName == df_target.Target_ShortName) & (df_countries.Source_ActiveRowHash != df_target.Target_CurrentRowHash), "inner")
df_updated.show()

从这里开始,我将循环遍历三个DataFrames,并根据需要更新SQL。

我知道我做了一个假设源ShortName是关键,但这是可以的。

显然,我已经测试了代码的功能,而且它看起来很有效,但这是最佳实践吗?

EN

回答 2

Code Review用户

发布于 2021-05-16 10:17:28

关于sql表和data types的一些观察。

在大多数关系数据库中,尤其是Server中,选择适当的数据类型是很重要的。

选择不适当的数据类型,或者懒得做字符串长度之类的事情,可能会导致意想不到的和不必要的性能下降。应该选择数据类型,以便它们能够处理预期的数据,但不会有更多的数据。

为什么?表中的行以称为页的结构排列,每个页的大小约为8K,无论是在磁盘上还是在内存中。页面是Server读取的最小数据量--要读取一行或多行,包含该行的页(S)是从磁盘读取的。每种数据类型越紧凑,单个页面上可以容纳的行就越多,读取一系列行所需的IO就越少。

您有一个名为Countries的表--大概这个表存储了一个国家列表,目前全世界有195个国家。Bigint不是CountryId的正确数据类型--除非您期望至少需要2,147,483,647行。TinyInt将是正确的数据类型,其可能值范围为0-255,每行使用一个字节,而不是8个字节。

datetime2的使用也是如此--您需要提供一个精度,比如使用6个字节存储的datetime2(1),默认的是使用8个字节的最大精度--您真的需要以1亿分之一秒的精度存储时间吗?

为什么所有的字符串数据类型都是nvarchar(max)?根据谷歌的数据,世界上最长的国家名是“大不列颠及北爱尔兰联合王国”,有56个字符。更合适的数据类型是nvarchar(60)

同样,短名称可能更短,同样适用于CapitalCities

为什么要为(n)varchar列指定适当的长度呢?根据定义,它们是可变的,所以只需要使用足够的空间来存储每个值,对吗?

这是正确的--但是有一个隐藏的,不那么明显的开销。

当Server构建执行计划时,它不仅包含执行查询所需的所有信息,如用于访问表、执行联接、排序、聚合等的操作符,还包含各种元数据和有关Server希望看到的数据的状态。其中之一是估计每一行将返回多少字节。

显然,在执行查询之前,Server无法预先知道特定列中的数据,但是执行查询所需的所有操作都必须事先预先准备好。

所有执行的查询都分配了最小的内存量来运行,有些操作需要自己的额外内存分配。

许多数据类型(例如int )都是固定长度的,而且没有歧义-- Server知道每一行的每个int列都是4个字节,因此结合它所知道的要处理的行数,可以很好地估计需要请求的内存数量。

但是,对于varchar数据类型,Server不知道,因此它假设每个列平均包含声明大小的50% (外加少量开销)。基于此,以及预期将处理多少行(其本身由数据的基数和统计信息决定),Server将请求足够的内存来处理这些估计的数据。

这可能导致查询请求和获取不成比例的大量内存来运行,最终不需要,但也不能用于所有其他并发执行的查询。

此外,选择Nvarchar作为数据类型意味着您希望需要unicode字符。这可能是这样的情况,在这种情况下,这是好的,因为你没有选择。

但是,请注意,每个字符需要在磁盘和内存中存储两个字节,而不是varchar's的一个。这可能会导致上述内存需求的膨胀,还意味着每个8K页上可以容纳的数据行数减少,导致与varchar一样读取相同数量的数据的IO更多,最终导致执行查询的速度变慢。

最近,我向同事演示了这如何影响查询性能。

我比较了两个简单的客户信息表,一个列的名称、地址和电子邮件大小适当,例如使用varchar(50),另一个列都是nvarchar(max)

我演示了各种人为的排序、过滤、字符串操作、窗口函数等操作。对于两个包含相同数据的表,nvarchar(max)表上相同查询的性能要慢600%,并且执行6gb以上的IO,而只有4mb。

因此,方法是始终正确地调整列的大小以满足预期的数据--这只是一个很好的实践!

票数 1
EN

Code Review用户

发布于 2021-04-27 17:25:25

事物的随机散落:

  • CapitalCities是可疑的,并提出了一个不恰当的规范化模式。如果一个国家需要代表多个首都,人们可能会期待一个单独的带有外键的表格。无论是这个还是这个名字都是错误的,你只需要代表每个国家的一个首都,在这种情况下,就是CapitalCity
  • 显然,您使用的是Microsoft,它公然违反了SQL标准。例如,boolean标准SQL类型,但MS 强迫你使用 bit。如果您在这方面有任何选择,请考虑迁移到符合标准的RDBMS。
  • 将全局代码移动到子程序中。
  • 通过response.raise_for_status()检查请求失败,而不是假设成功

另外,对于像这样的“流利”语法:

代码语言:javascript
复制
df_target = spark.read.format("jdbc") \
              .option("url", f"jdbc:sqlserver://{target_server};databaseName={target_database};") \
              .option("dbtable", target_table) \
              .option("user", target_username) \
              .option("password", target_password) \
              .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
              .load()

反而更喜欢

代码语言:javascript
复制
df_target = (
    spark.read.format("jdbc")
    .option("url", f"jdbc:sqlserver://{target_server};databaseName={target_database};")
    .option("dbtable", target_table)
    .option("user", target_username)
    .option("password", target_password)
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .load()
)
票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/260076

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档