首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用python或Scala将复杂的SQL查询转换为spark-dataframe

如何使用python或Scala将复杂的SQL查询转换为spark-dataframe
EN

Stack Overflow用户
提问于 2020-10-18 17:57:11
回答 2查看 387关注 0票数 0

我已经在spark中使用sqlcontext进行了一次转换,但我只想使用Spark Data frame来编写相同的查询。此查询包含join操作和SQL的case语句。sql查询编写如下:

代码语言:javascript
复制
refereshLandingData=spark.sql( "select a.Sale_ID, a.Product_ID,"
                           "CASE "
                           "WHEN (a.Quantity_Sold IS NULL) THEN b.Quantity_Sold "
                           "ELSE a.Quantity_Sold "
                           "END AS Quantity_Sold, "
                           "CASE "
                           "WHEN (a.Vendor_ID IS NULL) THEN b.Vendor_ID "
                           "ELSE a.Vendor_ID "
                           "END AS Vendor_ID, "
                           "a.Sale_Date, a.Sale_Amount, a.Sale_Currency "
                           "from landingData a left outer join preHoldData b on a.Sale_ID = b.Sale_ID" )

现在我想在scala和python的spark dataframe中使用等价的代码。我试过一些代码,但它

不工作的.my尝试代码如下:

代码语言:javascript
复制
joinDf=landingData.join(preHoldData,landingData['Sale_ID']==preHoldData['Sale_ID'],'left_outer')

joinDf.withColumn\
('QuantitySold',pf.when(pf.col(landingData('Quantity_Sold')).isNull(),pf.col(preHoldData('Quantity_Sold')))
.otherwise(pf.when(pf.col(preHoldData('Quantity_Sold')).isNull())),
 pf.col(landingData('Quantity_Sold'))).show()

在上面的代码中,连接做得很好,但是case条件不起作用。我得到--> TypeError:'DataFrame‘对象是不可调用的,我正在使用spark 2.3.2版本和Python3.7,以及类似的Scala2.11,在spark-scala的情况下,请任何人给我任何等效的代码或指南!

EN

回答 2

Stack Overflow用户

发布于 2020-10-18 18:29:08

这里有一个scala解决方案:假设landingDatapreHoldData是您的数据帧

代码语言:javascript
复制
 val landingDataDf = landingData.withColumnRenamed("Quantity_Sold","Quantity_Sold_ld")
 val preHoldDataDf = preHoldData.withColumnRenamed("Quantity_Sold","Quantity_Sold_phd")

 val joinDf = landingDataDf.join(preHoldDataDf, Seq("Sale_ID"))


 joinDf
 .withColumn("Quantity_Sold",
    when(col("Quantity_Sold_ld").isNull , col("Quantity_Sold_phd")).otherwise(col("Quantity_Sold_ld"))
 ). drop("Quantity_Sold_ld","Quantity_Sold_phd")

您可以对Vendor_id执行相同的操作

您的代码的问题是,您不能在withColumn操作中引用其他/旧的数据帧名称。它必须来自您正在操作的数据帧。

票数 3
EN

Stack Overflow用户

发布于 2020-10-18 18:40:49

下面的代码将在scala上运行&对于python,您可以稍微调整一下。

代码语言:javascript
复制
val preHoldData = spark.table("preHoldData").alias("a")
val landingData = spark.table("landingData").alias("b")

landingData.join(preHoldData,Seq("Sale_ID"),"leftouter")
.withColumn("Quantity_Sold",when(col("a.Quantity_Sold").isNull, col("b.Quantity_Sold")).otherwise(col("a.Quantity_Sold")))
.withColumn("Vendor_ID",when(col("a.Vendor_ID").isNull, col("b.Vendor_ID")).otherwise(col("a.Vendor_ID")))
.select(col("a.Sale_ID"),col("a.Product_ID"),col("Quantity_Sold"),col("Vendor_ID"),col("a.Sale_Date"),col("a.Sale_Amount"),col("a.Sale_Currency"))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64411973

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档