以下是我如何设置内容的一些要点:
我已经将CSV文件上传到S3,并设置了胶水爬虫来创建表和模式。我有一个Glue作业设置,它使用JDBC连接将数据从Glue表写入我们的Amazon Redshift数据库。Job还负责映射列和创建红移表。通过重新运行作业,我在redshift中得到了重复的行(正如预期的那样)。
但是,有没有办法在插入新数据之前替换或删除行?
书签功能已启用,但不起作用。
如何连接到redshift,在Python中将数据推送到redshift之前,删除作业中的所有数据?
发布于 2018-09-20 16:14:32
目前,Glue不支持JDBC源代码的书签。
您可以使用postactions选项(Scala中的代码)在胶水作业中实现upsert/merge into Redshift:
val fields = sourceDf.columns.mkString(",")
glueContext.getJDBCSink(
catalogConnection = "RedshiftConnectionTest",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> "staging_schema.staging_table",
"postactions" ->
s"""
DELETE FROM dst_schema.dst_table USING staging_schema.staging_table AS S WHERE dst_table.id = S.id;
INSERT INTO dst_schema.dst_table ($fields) SELECT $fields FROM staging_schema.staging_table;
DROP TABLE IF EXISTS staging_schema.staging_table
"""
)),
redshiftTmpDir = tempDir,
transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))如果您只想删除现有表,则可以使用preactions参数:
glueContext.getJDBCSink(
catalogConnection = "RedshiftConnectionTest",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> "dst_schema.dst_table",
"preactions" -> "DELETE FROM dst_schema.dst_table"
)),
redshiftTmpDir = tempDir,
transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))发布于 2018-09-19 14:52:57
只要您的表上有一个唯一的键,理想情况下是一个整数主键。那么我解决这个问题的方法如下:
a)从目标表中删除匹配的行
delete from target
where id in (select id from staging);b)将数据从分段插入到目标表
insert into target select * from staging;c)截断临时表
d)对两个表进行真空和分析
vacuum target to 100 percent;
analyze target;
vacuum staging;发布于 2018-09-20 15:47:44
你可以使用python模块pg8000来连接到Redfshift,并执行SQL从你的Glue脚本中删除(删除/截断)数据。pg8000是纯python,所以它可以和Glue一起工作。
请查看此链接:AWS Glue - Truncate destination postgres table prior to insert
我试过了,它工作得很好。希望这能帮到你,
https://stackoverflow.com/questions/52397646
复制相似问题