首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用AWS胶的火花扩展

使用AWS胶的火花扩展
EN

Stack Overflow用户
提问于 2022-09-30 13:21:29
回答 1查看 154关注 0票数 0

我已经在本地创建了一个脚本,它使用spark扩展'uk.co.gresearch.spark:spark-extension_2.12:2.2.0-3.3‘以简单的方式比较不同的DataFrames。

然而,当我在AWS上尝试这一点时,我遇到了一些问题,并收到了以下错误:ModuleNotFoundError: No模块,名为“gresearch”

我尝试从本地磁盘复制.jar文件,当我在本地初始化spark会话并收到以下消息时引用了该文件:

..。存储在以下文件中的包的jars :/User/“SOME_NAME”/.in 2/jars uk.co.g科研·火花#火花-扩展_2.12添加为依赖项.

在该路径中,我找到了一个名为: uk.co.gresearch.spark_spark-extension_2.12-2.2.0-3.3.jar的文件,我将其复制到S3,并在Jar路径中引用该文件。

但这行不通..。,您将如何以正确的方式设置它?

我在AWS Glue上测试这个示例代码如下所示:

代码语言:javascript
复制
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

appName = 'test_gresearch'
spark_conf = SparkConf()
spark_conf.setAll([('spark.jars.packages', 'uk.co.gresearch.spark:spark- 
extension_2.12:2.2.0-3.3')])
spark=SparkSession.builder.config(conf=spark_conf)\
.enableHiveSupport().appName(appName).getOrCreate()

from gresearch.spark.diff import *

df1 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "UK"],
  [3, "GHI", 3000, "JPN"],
  [4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])

df2 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "CAN"],
  [3, "GHI", 3500, "JPN"],
  [4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])

df1.show()
df2.show()

options = DiffOptions().with_change_column('changes')
df1.diff_with_options(df2, options, 'id').show()

任何小费都是受欢迎的。提前谢谢你!

问候

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-20 07:42:02

在与AWS支持团队进行了一些调查之后,我被指示通过Python路径包含包.jar文件,因为.jar文件包含嵌入式package。因此,应该下载.jar文件的正确版本(.jar文件的版本是我最后使用的版本),并上传到S3中,并在Glue作业设置的https://mvnrepository.com/artifact/uk.co.gresearch.spark/spark-extension_2.12/2.1.0-3.1库路径下引用(例如-https://mvnrepository.com/artifact/uk.co.gresearch.spark/spark-extension_2.12/2.1.0-3.1)。

代码语言:javascript
复制
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()   
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job.commit()

left = spark.createDataFrame([(1, "one"), (2, "two"), (3, "three")], ["id", "value"])
right = spark.createDataFrame([(1, "one"), (2, "Two"), (4, "four")], ["id", "value"])

from gresearch.spark.diff import *

left.diff(right, "id").show()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73909441

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档