我已经在Spark 2中播放过流媒体数据。
我想用dropDuplicates方法复制记录。
我在Spark网站上发现我可以在watermark中使用dropDuplicates。
这是我的带水印的代码,不带dropDuplicates方法:
parsed = parsed_opc \
.withWatermark("sourceTimeStamp", "10 minutes") \
.groupBy(
window(parsed_opc.sourceTimeStamp, "4 seconds"),
parsed_opc.id
) \
.agg({"value": "avg"}) \
.withColumnRenamed("avg(value)", "avg")\
.orderBy("avg", ascending=True)这段代码可以工作。但是当我想像这样添加dropDuplicates时:
parsed = parsed_opc \
.withWatermark("sourceTimeStamp", "10 minutes") \
.dropDuplicates("id", "sourceTimeStamp") \
.groupBy(
window(parsed_opc.sourceTimeStamp, "4 seconds"),
parsed_opc.id
) \
.agg({"value": "avg"}) \
.withColumnRenamed("avg(value)", "avg")\
.orderBy("avg", ascending=True)它抛出一个错误:TypeError: dropDuplicates() takes from 1 to 2 positional arguments but 3 were given。
我不明白为什么会抛出这个错误。这种用法在Spark网站上也有,就像这样。这个错误的原因是什么?
发布于 2019-07-21 10:18:31
您需要使用方括号在dropDuplicates()方法中声明多个列。
如下所示:
parsed = parsed_opc \
.withWatermark("sourceTimeStamp", "10 minutes") \
.dropDuplicates(["id", "sourceTimeStamp"]) \
.groupBy(
window(parsed_opc.sourceTimeStamp, "4 seconds"),
parsed_opc.id
) \
.agg({"value": "avg"}) \
.withColumnRenamed("avg(value)", "avg")\
.orderBy("avg", ascending=True)https://stackoverflow.com/questions/57128726
复制相似问题