首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >奇怪的性能问题火花LSH MinHash approxSimilarityJoin

奇怪的性能问题火花LSH MinHash approxSimilarityJoin
EN

Stack Overflow用户
提问于 2018-07-18 13:47:43
回答 1查看 1.5K关注 0票数 6

我正在使用Apache的approxSimilarityJoin方法加入两个数据集,但我看到了一些奇怪的行为。

在(内部)加入数据集之后,数据集会有一点偏差,但是每次一个或多个任务都要花费过多的时间来完成。

如您所见,每个任务的中位数为6ms (我正在一个较小的源数据集上运行它以进行测试),但一个任务需要10分钟。它几乎不使用任何CPU周期,它实际上连接数据,但如此缓慢。第二个最慢的任务运行在14s,有4倍以上的记录&实际上是溢出到磁盘。

如果你看

join本身是pos & hashValue (min散列)上的两个数据集之间的内部连接,根据min散列规范& udf计算匹配对之间的jaccard距离。

引爆哈希表:

代码语言:javascript
复制
modelDataset.select(
      struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))

Jaccard距离函数:

代码语言:javascript
复制
 override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
    val xSet = x.toSparse.indices.toSet
    val ySet = y.toSparse.indices.toSet
    val intersectionSize = xSet.intersect(ySet).size.toDouble
    val unionSize = xSet.size + ySet.size - intersectionSize
    assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
    1 - intersectionSize / unionSize
  }

加入处理过的数据集:

代码语言:javascript
复制
// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
  .drop(explodeCols: _*).distinct()

// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
  distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)

// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)

我尝试了缓存、重新分区甚至启用spark.speculation的组合,但都没有效果。

这些数据由必须匹配的shingles地址文本组成:53536, Evansville, WI => 53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi将有一段短距离,其中有在城市或zip中有错误的记录。

这给出了相当准确的结果,但可能是连接倾斜的原因。

我的问题是:

  • 是什么导致了这一差异?(一项任务需要很长时间,尽管它的记录较少)
  • 我怎样才能在不失去准确性的情况下防止这种最小值的偏差?
  • 有更好的方法在规模上做到这一点吗?(我无法将数百万条记录与位置数据集中的所有记录进行比较)
EN

回答 1

Stack Overflow用户

发布于 2020-07-07 00:47:43

现在可能有点晚了,但我会在这里张贴我的答案,以帮助其他人。最近,我在匹配拼错公司名称(All executors dead MinHash LSH PySpark approxSimilarityJoin self-join on EMR cluster)方面也遇到了类似的问题。有人建议使用NGrams来减少数据的偏差,从而帮助了我。这对我有很大帮助。你也可以尝试使用例如3克或4克。

我不知道数据有多脏,但您可以尝试使用状态。它大大减少了可能匹配的数量。

真正帮助我提高匹配精度的是,通过在每个组件上运行标签传播算法,对连接的组件(由MinHashLSH生成的连接匹配组)进行后处理。这还允许您增加N( NGrams),从而缓解数据倾斜的问题,减少approxSimilarityJoin中jaccard距离参数的设置,并使用标签传播进行后处理。

最后,我目前正在研究如何使用例程图来匹配它。我发现,在某些情况下,它工作得更好,并在一定程度上减少了数据的倾斜。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51403709

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档