我已经在spark中使用sqlcontext进行了一次转换,但我只想使用Spark Data frame来编写相同的查询。此查询包含join操作和SQL的case语句。sql查询编写如下:
refereshLandingData=spark.sql( "select a.Sale_ID, a.Product_ID,"
"CASE "
"WHEN (a.Quantity_Sold IS NULL) THEN b.Quantity_Sold "
"ELSE a.Quantity_Sold "
"END AS Quantity_Sold, "
"CASE "
"WHEN (a.Vendor_ID IS NULL) THEN b.Vendor_ID "
"ELSE a.Vendor_ID "
"END AS Vendor_ID, "
"a.Sale_Date, a.Sale_Amount, a.Sale_Currency "
"from landingData a left outer join preHoldData b on a.Sale_ID = b.Sale_ID" )现在我想在scala和python的spark dataframe中使用等价的代码。我试过一些代码,但它
不工作的.my尝试代码如下:
joinDf=landingData.join(preHoldData,landingData['Sale_ID']==preHoldData['Sale_ID'],'left_outer')
joinDf.withColumn\
('QuantitySold',pf.when(pf.col(landingData('Quantity_Sold')).isNull(),pf.col(preHoldData('Quantity_Sold')))
.otherwise(pf.when(pf.col(preHoldData('Quantity_Sold')).isNull())),
pf.col(landingData('Quantity_Sold'))).show()在上面的代码中,连接做得很好,但是case条件不起作用。我得到--> TypeError:'DataFrame‘对象是不可调用的,我正在使用spark 2.3.2版本和Python3.7,以及类似的Scala2.11,在spark-scala的情况下,请任何人给我任何等效的代码或指南!
发布于 2020-10-18 18:29:08
这里有一个scala解决方案:假设landingData和preHoldData是您的数据帧
val landingDataDf = landingData.withColumnRenamed("Quantity_Sold","Quantity_Sold_ld")
val preHoldDataDf = preHoldData.withColumnRenamed("Quantity_Sold","Quantity_Sold_phd")
val joinDf = landingDataDf.join(preHoldDataDf, Seq("Sale_ID"))
joinDf
.withColumn("Quantity_Sold",
when(col("Quantity_Sold_ld").isNull , col("Quantity_Sold_phd")).otherwise(col("Quantity_Sold_ld"))
). drop("Quantity_Sold_ld","Quantity_Sold_phd")您可以对Vendor_id执行相同的操作
您的代码的问题是,您不能在withColumn操作中引用其他/旧的数据帧名称。它必须来自您正在操作的数据帧。
发布于 2020-10-18 18:40:49
下面的代码将在scala上运行&对于python,您可以稍微调整一下。
val preHoldData = spark.table("preHoldData").alias("a")
val landingData = spark.table("landingData").alias("b")
landingData.join(preHoldData,Seq("Sale_ID"),"leftouter")
.withColumn("Quantity_Sold",when(col("a.Quantity_Sold").isNull, col("b.Quantity_Sold")).otherwise(col("a.Quantity_Sold")))
.withColumn("Vendor_ID",when(col("a.Vendor_ID").isNull, col("b.Vendor_ID")).otherwise(col("a.Vendor_ID")))
.select(col("a.Sale_ID"),col("a.Product_ID"),col("Quantity_Sold"),col("Vendor_ID"),col("a.Sale_Date"),col("a.Sale_Amount"),col("a.Sale_Currency"))https://stackoverflow.com/questions/64411973
复制相似问题