有更好的方法来优化下面的笔记本吗?目前,它需要2分20秒才能运行。我怎样才能提高性能?如有任何建议,将不胜感激。谢谢。
环境:
设置环境变量
environment = "mydatalake"
fileFormat = "parquet"函数-设置从何处加载源文件的路径。
tblName = ""
fldrName = ""
dbName = ""
filePrefix = ""
# Create the function
def fn_PathSource(fldrName,dbName,tblName,fileFormat,filePrefix):
str_path0 = "spark.read.load("
str_path1 = "'abfss://"
str_path2 = ".dfs.core.windows.net/sources"
str_path3 = ", format="
return f"{str_path0}{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/{dbName}{filePrefix}{tblName}.{fileFormat}'{str_path3}'{fileFormat}')"函数-设置表数据将存储在数据集中的路径。
# Create the variables used by the function
tblName = ""
fldrName = ""
dbName = ""
# Create the function
def fn_Path(fldrName,dbName,tblName):
str_path1 = "abfss://"
str_path2 = ".dfs.core.windows.net"
return f"{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/"函数-获取记录的最新版本。
import hashlib
from pyspark.sql.functions import md5, concat_ws,col
# Create the variables used by the function
uniqueId = ""
versionId = ""
tblName = ""
# Create the function
def fn_ReadLatestVrsn(uniqueId,versionId,tblName):
df_Max = spark.sql(f"SELECT {uniqueId},MAX({versionId}) AS {versionId}Max FROM {tblName} GROUP BY {uniqueId}")
df_Max.createOrReplaceTempView(f"{tblName}Max")
df_Latest = spark.sql(f"SELECT {uniqueId},{versionId}Max FROM {tblName}Max")
df_Latest = df_Latest.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}Max").cast("string"))))
df_Latest.createOrReplaceTempView(f"{tblName}Latest")
df_Hash = spark.sql(f"SELECT * FROM {tblName} t1")
df_Hash = df_Hash.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}").cast("string"))))
df_Hash.createOrReplaceTempView(f"{tblName}Hash")
df_Final = spark.sql(f"SELECT DISTINCT t1.* FROM {tblName}Hash t1 INNER JOIN {tblName}Latest t2 ON t1.HashKey = t2.HashKey")
df_Final.createOrReplaceTempView(f"{tblName}")
return spark.sql(f"SELECT * FROM {tblName}")用源表数据加载数据帧
DF_tblBitSize = eval(fn_PathSource("silver","MineDB","tblBitSize","parquet","_dbo_"))
DF_tblDailyReport = eval(fn_PathSource("silver","MineDB","tblDailyReport","parquet","_dbo_"))
DF_tblDailyReportHole = eval(fn_PathSource("silver","MineDB","tblDailyReportHole","parquet","_dbo_"))
DF_tblDailyReportHoleActivity = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivity","parquet","_dbo_"))
DF_tblDailyReportHoleActivityHours = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivityHours","parquet","_dbo_"))
DF_tblDailyReportShift = eval(fn_PathSource("silver","MineDB","tblDailyReportShift","parquet","_dbo_"))
DF_tblDrill = eval(fn_PathSource("silver","MineDB","tblDrill","parquet","_dbo_"))
DF_tblDrillType = eval(fn_PathSource("silver","MineDB","tblDrillType","parquet","_dbo_"))
DF_tblEmployee = eval(fn_PathSource("silver","MineDB","tblEmployee","parquet","_dbo_"))
DF_tblHole = eval(fn_PathSource("silver","MineDB","tblHole","parquet","_dbo_"))
DF_tblMineProject = eval(fn_PathSource("silver","MineDB","tblMineProject","parquet","_dbo_"))
DF_tblShift = eval(fn_PathSource("silver","MineDB","tblShift","parquet","_dbo_"))
DF_tblUnit = eval(fn_PathSource("silver","MineDB","tblUnit","parquet","_dbo_"))
DF_tblUnitType = eval(fn_PathSource("silver","MineDB","tblUnitType","parquet","_dbo_"))
DF_tblWorkSubCategory = eval(fn_PathSource("silver","MineDB","tblWorkSubCategory","parquet","_dbo_"))
DF_tblWorkSubCategoryType = eval(fn_PathSource("silver","MineDB","tblWorkSubCategoryType","parquet","_dbo_"))
DF_v_Dashboards_CompanyContracts= eval(fn_PathSource("silver","MineDB","v_Dashboards_CompanyContracts","parquet","_"))
DF_v_DailyReportShiftDrillers = eval(fn_PathSource("silver","MineDB","v_DailyReportShiftDrillers","parquet","_"))
DF_v_ActivityCharges = eval(fn_PathSource("silver","MineDB","v_ActivityCharges","parquet","_"))将dataframes转换为可在SQL中使用的临时视图
DF_tblBitSize.createOrReplaceTempView("tblBitSize")
DF_tblDailyReport.createOrReplaceTempView("tblDailyReport")
DF_tblDailyReportHole.createOrReplaceTempView("tblDailyReportHole")
DF_tblDailyReportHoleActivity.createOrReplaceTempView("tblDailyReportHoleActivity")
DF_tblDailyReportHoleActivityHours.createOrReplaceTempView("tblDailyReportHoleActivityHours")
DF_tblDailyReportShift.createOrReplaceTempView("tblDailyReportShift")
DF_tblDrill.createOrReplaceTempView("tblDrill")
DF_tblDrillType.createOrReplaceTempView("tblDrillType")
DF_tblEmployee.createOrReplaceTempView("tblEmployee")
DF_tblHole.createOrReplaceTempView("tblHole")
DF_tblMineProject.createOrReplaceTempView("tblMineProject")
DF_tblShift.createOrReplaceTempView("tblShift")
DF_tblUnit.createOrReplaceTempView("tblUnit")
DF_tblUnitType.createOrReplaceTempView("tblUnitType")
DF_tblWorkSubCategory.createOrReplaceTempView("tblWorkSubCategory")
DF_tblWorkSubCategoryType.createOrReplaceTempView("tblWorkSubCategoryType") DF_v_Dashboards_CompanyContracts.createOrReplaceTempView("v_Dashboards_CompanyContracts")
DF_v_DailyReportShiftDrillers.createOrReplaceTempView("v_DailyReportShiftDrillers")
DF_v_ActivityCharges.createOrReplaceTempView("v_ActivityCharges")将最新数据加载到视图中
当源系统表中的现有记录被更新(或发生软删除)时,Azure Data将通过创建增量式地板文件来捕获该更改。在创建新记录时也会发生同样的情况。在合并过程中,所有增量文件都合并到一个拼花文件中。对于已更新的现有记录(或发生了软删除),合并将创建该记录的两个版本,并附加最新版本。如果要查询合并的拼花文件,您将看到一个重复的记录。因此,要只看到该记录的最新版本,我们需要删除以前的版本。这个功能将确保我们正在查看所有记录的最新版本。
**特别注意:对于没有软删除记录的表(例如没有LastModDateTime或ActiveInd列的表),这种逻辑是不必要的,因此,我们不对这些表应用此功能。
DF_tblBitSize = fn_ReadLatestVrsn("BitSizeID","LastModDateTime","tblBitSize")
DF_tblDailyReport = fn_ReadLatestVrsn("DailyReportID","LastModDateTime","tblDailyReport")
DF_tblDailyReportHole = fn_ReadLatestVrsn("DailyReportHoleID","LastModDateTime","tblDailyReportHole")
DF_tblDailyReportHoleActivity = fn_ReadLatestVrsn("DailyReportHoleActivityID","LastModDateTime","tblDailyReportHoleActivity")
DF_tblDailyReportHoleActivityHours = fn_ReadLatestVrsn("DailyReportHoleActivityHoursID","LastModDateTime","tblDailyReportHoleActivityHours")
DF_tblDailyReportShift = fn_ReadLatestVrsn("DailyReportShiftID","LastModDateTime","tblDailyReportShift")
DF_tblDrill = fn_ReadLatestVrsn("DrillID","LastModDateTime","tblDrill")
DF_tblEmployee = fn_ReadLatestVrsn("EmployeeID","LastModDateTime","tblEmployee")
DF_tblHole = fn_ReadLatestVrsn("HoleID","LastModDateTime","tblHole")
DF_tblMineProject = fn_ReadLatestVrsn("MineProjectID","LastModDateTime","tblMineProject")
DF_tblShift = fn_ReadLatestVrsn("ShiftID","LastModDateTime","tblShift")
DF_tblWorkSubCategoryType = fn_ReadLatestVrsn("WorkSubCategoryTypeID","LastModDateTime","tblWorkSubCategoryType")CTE_UnitConversion
%%sql
CREATE OR REPLACE TEMP VIEW CTE_UnitConversion AS
(
SELECT
u.UnitID
,ut.UnitType
,u.UnitName
,u.UnitAbbr
,COALESCE(CAST(u.Conversion AS FLOAT),1) AS Conversion
FROM
tblUnit u
INNER JOIN tblUnitType ut
ON u.UnitTypeID = ut.UnitTypeID
AND ut.UnitType IN ('Distance','Depth')
UNION
SELECT
-1 AS UnitID
,'Unknown' AS UnitType
,'Unknown' AS UnitName
,'Unknown' AS UnitAbbr
,1 AS Conversion
)CTE_Dashboards_BaseData
%%sql
CREATE OR REPLACE TEMP VIEW CTE_Dashboards_BaseData AS
(
SELECT
CC.ContractID,
CC.ProjectID,
CAST(DR.ReportDate AS DATE) AS ReportDate,
D.DrillID,
CAST(D.DrillName AS STRING) AS DrillName,
DT.DrillTypeID,
CAST(DT.DrillType AS STRING) AS DrillType,
CAST(NULL AS STRING) AS HoleName,
CAST(S.ShiftName AS STRING) AS ShiftName,
STRING(CONCAT(E.LastName,' ',E.FirstName)) AS Supervisor,
CAST(DRSD.Drillers AS STRING) AS Driller,
CAST(NULL AS FLOAT) AS TotalMeterage,
CAST(NULL AS FLOAT) AS Depth,
CAST(NULL AS STRING) AS DepthUnit,
CAST(NULL AS FLOAT) AS ManHours,
CAST(NULL AS FLOAT) AS Payrollhours,
CAST(NULL AS FLOAT) AS ActivityHours,
CAST(NULL AS FLOAT) AS EquipmentHours,
CAST(NULL AS FLOAT) AS Quantity,
CAST(NULL AS STRING) AS Category,
CAST(NULL AS STRING) AS SubCategory,
CAST(NULL AS STRING) AS HoursType,
CAST(NULL AS STRING) AS BitSize,
CAST(DRS.DailyReportShiftID AS BIGINT) AS DailyReportShiftID,
CAST(DRS.ShiftID AS INT) AS ShiftID,
CAST(NULL AS TIMESTAMP) AS CompleteDateTime,
CAST(NULL AS STRING) AS HoleCompletionStatus,
CAST(NULL AS STRING) AS Notes,
CAST(NULL AS INT) AS HoleID,
CAST(NULL AS FLOAT) AS DistanceFrom,
CAST(NULL AS FLOAT) AS DistanceTo,
CAST(NULL AS STRING) AS DistanceFromToUnit,
CAST(NULL AS FLOAT) AS Distance,
CAST(NULL AS STRING) AS DistanceUnit,
CAST(NULL AS STRING) AS FluidUnit,
CAST(NULL AS FLOAT) AS FluidVolume,
CAST(NULL AS STRING) AS UID,
CAST(NULL AS FLOAT) AS MaxDepth,
CAST(NULL AS FLOAT) AS Penetration,
CAST(NULL AS FLOAT) AS Charges,
CAST(DR.Status AS STRING) AS Status,
CAST(DRS.LastModDateTime AS TIMESTAMP) AS LastModDateTime
FROM
v_Dashboards_CompanyContracts CC
LEFT JOIN tblDailyReport DR ON CC.ContractID = DR.ContractID AND CC.ProjectID = DR.ProjectID
LEFT JOIN tblDailyReportShift DRS ON DR.DailyReportID = DRS.DailyReportID
LEFT JOIN tblShift S ON DRS.ShiftID = S.ShiftID
LEFT JOIN tblDrill D ON DR.DrillID = D.DrillID
LEFT JOIN tblDrillType DT ON D.DrillTypeID = DT.DrillTypeID
LEFT JOIN tblEmployee E ON DRS.SupervisorID = E.EmployeeID
LEFT JOIN v_DailyReportShiftDrillers DRSD ON DRS.DailyReportShiftID = DRSD.DailyReportShiftID
WHERE
DR.Status <> 'Deleted'
)CTE_DailyReportHoleActivityManHours
%%sql
CREATE OR REPLACE TEMP VIEW CTE_DailyReportHoleActivityManHours AS
(
SELECT
DailyReportHoleActivityID
,SUM(HoursAsFloat) AS ManHours
FROM
tblDailyReportHoleActivityHours
WHERE
ActiveInd = 'Y'
GROUP BY
DailyReportHoleActivityID
)活动费用
%%sql
CREATE OR REPLACE TEMP VIEW SECTION_1 AS
(
SELECT
BD.ContractID
,BD.ProjectID
,CAST(ReportDate AS DATE) AS ReportDate
,DrillID
,DRHA.Depth
,DPU.UnitAbbr AS DepthUnit
,DPU.UnitID AS DepthUnitID
,DRHAMH.ManHours
,DRHA.ActivityHoursAsFloat AS ActivityHours
,WSC.WorkSubCategoryName AS Category
,WSCT.TypeName AS SubCategory
,CASE
WHEN (COALESCE(AC.Charges,0) = 0 AND COALESCE(AC.BillableCount, 0) = 0) OR DRHA.Billable='N' THEN 'Non-Billable'
WHEN AC.DefinedRateName IS NOT NULL AND DRHA.Billable <> 'N' THEN AC.DefinedRateName
ELSE WSC.WorkSubCategoryName
END AS HoursType
,BS.BitSizeID AS BitSizeID
,BS.BitSize
,DRHA.BitID AS BitID
,BD.DailyReportShiftID
,DRHA.Notes
,H.HoleID
,DRHA.DistanceFrom
,DRHA.DistanceTo
,DFU.UnitAbbr AS DistanceFromToUnit
,DFU.UnitID AS DistanceFromToUnitID
,DRHA.Distance
,DU.UnitID AS DistanceUnitID
,CASE
WHEN WSC.WorkCategoryId = 1 THEN MAX(COALESCE(DRHA.DistanceTo, 0)) OVER ( PARTITION BY H.HoleID, WSC.WorkSubCategoryName ORDER BY H.HoleID, ReportDate, BD.ShiftID, DRHA.SequenceNumber, DRHA.CreateDateTime, DRHA.DistanceTo)
ELSE NULL
END AS MaxDepth
,CASE
WHEN WSC.WorkCategoryId = 1 THEN DRHA.Penetration
ELSE 0
END AS Penetration
,COALESCE(AC.Charges,0) AS Charges
,BD.Status
,H.MineProjectID
,CAST(DRHA.LastModDateTime AS TIMESTAMP) AS LastModDateTime
FROM
CTE_Dashboards_BaseData BD
INNER JOIN tblDailyReportHole DRH ON BD.DailyReportShiftID = DRH.DailyReportShiftID
INNER JOIN tblDailyReportHoleActivity DRHA ON DRH.DailyReportHoleID = DRHA.DailyReportHoleID
INNER JOIN tblWorkSubCategory WSC ON DRHA.WorkSubCategoryID = WSC.WorkSubCategoryID
LEFT JOIN tblHole H ON DRH.HoleID = H.HoleID
LEFT JOIN tblBitSize BS ON DRHA.BitSizeID = BS.BitSizeID
LEFT JOIN tblUnit DPU ON DRHA.DepthUnitID = DPU.UnitID
LEFT JOIN tblUnit DFU ON DRHA.DistanceFromToUnitID = DFU.UnitID
LEFT JOIN tblUnit DU ON DRHA.DistanceUnitID = DU.UnitID
LEFT JOIN tblWorkSubCategoryType WSCT ON DRHA.TypeID = WSCT.WorkSubCategoryTypeID
LEFT JOIN v_ActivityCharges AC ON DRHA.DailyReportHoleActivityID = AC.DailyReportHoleActivityID
LEFT JOIN CTE_DailyReportHoleActivityManHours DRHAMH ON DRHA.DailyReportHoleActivityID = DRHAMH.DailyReportHoleActivityID
WHERE
DRH.ActiveInd = 'Y'
AND DRHA.ActiveInd = 'Y'
)创建FACT_Activity表
df = spark.sql("""
SELECT
ReportDate
,DrillingCompanyID
,MiningCompanyID
,DrillID
,ProjectID
,ContractID
,LocationID
,HoleID
,DailyReportShiftId
,MineProjectID
,BitID
,TRIM(UPPER(BitSize)) AS BitSize
,-1 AS TimesheetId
,CurrencyID
,TRIM(UPPER(Category)) AS Category
,TRIM(UPPER(SubCategory)) AS SubCategory
,TRIM(UPPER(HoursType)) AS HoursType
,TRIM(UPPER(Notes)) AS Notes
,ApprovalStatus
,Depth AS Depth
,(Depth/COALESCE(Depth.Conversion,1)) AS DepthMeters
,Manhours
,ActivityHours
,DistanceFrom
,DistanceTo
,Distance
,Penetration
,(DistanceFrom/Distance.Conversion) AS DistanceFromMeters
,(DistanceTo/Distance.Conversion) AS DistanceToMeters
,(Distance/Distance.Conversion) AS DistanceMeters
,(Penetration/Distance.Conversion) AS PenetrationMeters
,DepthUnitID
,DistanceFromToUnitID
,Charges
,LastModDateTime
,ReportApprovalRequired
FROM
(
SELECT
COALESCE(CAST(ReportDate AS DATE),'01/01/1900') AS ReportDate
,COALESCE(DrillingCompanyID,-1) AS DrillingCompanyID
,COALESCE(MiningCompanyID,-1) AS MiningCompanyID
,COALESCE(DrillID,-1) AS DrillID
,COALESCE(C.ProjectID, -1) AS ProjectID
,COALESCE(C.ContractID,-1) AS ContractID
,COALESCE(C.LocationID,-1) AS LocationID
,COALESCE(HoleID,-1) AS HoleID
,COALESCE(DailyReportShiftID,-1) AS DailyReportShiftId
,COALESCE(MP.MineProjectID,-1) AS MineProjectID
,COALESCE(BitID,-1) AS BitID
,COALESCE(BitSize,'UNKNOWN') AS BitSize
,COALESCE(DepthUnitID,-1) AS DepthUnitID
,COALESCE(DistanceFromToUnitID,-1) AS DistanceFromToUnitID
,COALESCE(DistanceUnitID,-1) AS DistanceUnitID
,COALESCE(C.CurrencyID,-1) AS CurrencyID
,COALESCE(Category,'Unknown') AS Category
,COALESCE(SubCategory,'UNKNOWN') AS SubCategory
,COALESCE(HoursType,'UNKNOWN') AS HoursType
,SUBSTRING(Notes,0,250) AS Notes
,COALESCE(U.Status,'Unknown') AS ApprovalStatus
,COALESCE(Depth,0) AS Depth
,COALESCE(Manhours,0) AS Manhours
,COALESCE(ActivityHours,0) AS ActivityHours
,COALESCE(DistanceFrom,0) AS DistanceFrom
,COALESCE(DistanceTo,0) AS DistanceTo
,COALESCE(Distance,0) AS Distance
,COALESCE(Penetration,0) AS Penetration
,COALESCE(Charges,0) AS Charges
,COALESCE(CAST(U.LastModDateTime AS TIMESTAMP),'1900/01/01 00:00:00') AS LastModDateTime
,C.ReportApprovalRequired
FROM
SECTION_1 U
LEFT JOIN v_Dashboards_CompanyContracts C ON U.ContractID = C.ContractID AND COALESCE(U.ProjectID,-1) = C.ProjectID
LEFT JOIN tblMineProject MP ON U.MineProjectID = MP.MineProjectID AND MP.ActiveInd = 'Y'
) TBL1
INNER JOIN CTE_UnitConversion Distance ON tbl1.DistanceFromToUnitID = Distance.UnitID
INNER JOIN CTE_UnitConversion Depth ON tbl1.DepthUnitID = Depth.UnitID
""")创建表并写入数据集
tblName = "fact_activity"
fldrName = "myfolder"
dbName = "mydatabase"
path = fn_Path(fldrName,dbName,tblName)
path
# Reduce the number of parquet files written using coalesce and write the dataframe to the datalake
df.coalesce(1).write.format("parquet").mode("overwrite").save(path)
# Drop the table (only dropping the metadata) if it exists in the lakehouse database
spark.sql(f"DROP TABLE IF EXISTS {dbName}.{tblName}")
# Now create the table (metadata only) and point it at the data in the datalake
spark.sql(f"CREATE TABLE {dbName}.{tblName} USING PARQUET LOCATION '{path}'")从内存中释放SQL视图
%%sql
DROP VIEW SECTION_1;
DROP VIEW CTE_DailyReportHoleActivityManHours;
DROP VIEW CTE_Dashboards_BaseData;
DROP VIEW CTE_UnitConversion;
DROP VIEW tblBitSize;
DROP VIEW tblDailyReport;
DROP VIEW tblDailyReportHole;
DROP VIEW tblDailyReportHoleActivity;
DROP VIEW tblDailyReportHoleActivityHours;
DROP VIEW tblDailyReportShift;
DROP VIEW tblDrill;
DROP VIEW tblEmployee;
DROP VIEW tblHole;
DROP VIEW tblMineProject;
DROP VIEW tblShift;从内存中释放数据帧
del DF_tblBitSize
del DF_tblDailyReport
del DF_tblDailyReportHole
del DF_tblDailyReportHoleActivity
del DF_tblDailyReportHoleActivityHours
del DF_tblDailyReportShift
del DF_tblDrill
del DF_tblDrillType
del DF_tblEmployee
del DF_tblHole
del DF_tblMineProject
del DF_tblShift
del DF_tblUnit
del DF_tblUnitType
del DF_tblWorkSubCategory
del DF_v_Dashboards_CompanyContracts
del DF_v_DailyReportShiftDrillers
del DF_v_ActivityCharges发布于 2022-04-19 05:03:13
除了,来自内存的和发布的SQL视图,以及来自内存的数据帧,一切看起来都很好。
如果应用程序需要频繁查询数据,并且需要创建VIEWS,则可以在专用SQL中创建外部表,并使用Synapse保存表的视图。这样会更有效率,而且不需要每次需要数据时都会丢弃视图和发布数据。
您还可以使用在Azure Synapse分析中使用SQL池创建和使用本机外部表,因为与在外部数据源定义中使用TYPE=HADOOP的外部表相比,本机外部表具有更好的性能。这是因为本机外部表使用本机代码访问外部数据。
您还可以参考Azure Synapse分析中无服务器SQL池的最佳实践获得有关性能优化的更多细节。
https://stackoverflow.com/questions/71887380
复制相似问题