我正在使用Apache的approxSimilarityJoin方法加入两个数据集,但我看到了一些奇怪的行为。
在(内部)加入数据集之后,数据集会有一点偏差,但是每次一个或多个任务都要花费过多的时间来完成。

如您所见,每个任务的中位数为6ms (我正在一个较小的源数据集上运行它以进行测试),但一个任务需要10分钟。它几乎不使用任何CPU周期,它实际上连接数据,但如此缓慢。第二个最慢的任务运行在14s,有4倍以上的记录&实际上是溢出到磁盘。
如果你看

join本身是pos & hashValue (min散列)上的两个数据集之间的内部连接,根据min散列规范& udf计算匹配对之间的jaccard距离。
引爆哈希表:
modelDataset.select(
struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))Jaccard距离函数:
override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
val xSet = x.toSparse.indices.toSet
val ySet = y.toSparse.indices.toSet
val intersectionSize = xSet.intersect(ySet).size.toDouble
val unionSize = xSet.size + ySet.size - intersectionSize
assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
1 - intersectionSize / unionSize
}加入处理过的数据集:
// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
.drop(explodeCols: _*).distinct()
// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)
// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)我尝试了缓存、重新分区甚至启用spark.speculation的组合,但都没有效果。
这些数据由必须匹配的shingles地址文本组成:53536, Evansville, WI => 53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi将有一段短距离,其中有在城市或zip中有错误的记录。
这给出了相当准确的结果,但可能是连接倾斜的原因。
我的问题是:
发布于 2020-07-07 00:47:43
现在可能有点晚了,但我会在这里张贴我的答案,以帮助其他人。最近,我在匹配拼错公司名称(All executors dead MinHash LSH PySpark approxSimilarityJoin self-join on EMR cluster)方面也遇到了类似的问题。有人建议使用NGrams来减少数据的偏差,从而帮助了我。这对我有很大帮助。你也可以尝试使用例如3克或4克。
我不知道数据有多脏,但您可以尝试使用状态。它大大减少了可能匹配的数量。
真正帮助我提高匹配精度的是,通过在每个组件上运行标签传播算法,对连接的组件(由MinHashLSH生成的连接匹配组)进行后处理。这还允许您增加N( NGrams),从而缓解数据倾斜的问题,减少approxSimilarityJoin中jaccard距离参数的设置,并使用标签传播进行后处理。
最后,我目前正在研究如何使用例程图来匹配它。我发现,在某些情况下,它工作得更好,并在一定程度上减少了数据的倾斜。
https://stackoverflow.com/questions/51403709
复制相似问题