我正在为电影做一个命令系统,使用这里可用的MovieLens数据集:http://grouplens.org/datasets/movielens/
为了计算这个命令系统,我在scala中使用了Flink的ML库,特别是ALS算法(org.apache.flink.ml.recommendation.ALS)。
我首先将电影的收视率映射到DataSet[(Int, Int, Double)]中,然后创建一个trainingSet和一个testSet (参见下面的代码)。
我的问题是,当我在整个数据集(所有评级)中使用ALS.fit函数时,没有bug,但是如果我只删除一个评级,那么fit函数就不再工作了,我不明白为什么。
你有什么想法吗?)
使用的代码:
Rating.scala
case class Rating(userId: Int, movieId: Int, rating: Double)PreProcessing.scala
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}Processing.scala
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first(ratings.count().toInt)
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
}
}“但如果我只删除一个评级”“
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first((ratings.count()-1).toInt)错误:
06/19/2015 15:00:24 CoGroup ( org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4)的CoGroup切换到失败) java.lang.ArrayIndexOutOfBoundsException: 5 在org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358) 在org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635) 在org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152) ..。
发布于 2015-06-19 16:57:42
问题是first操作符与Flink的ALS实现的setTemporaryPath参数相结合。为了理解这个问题,让我快速解释一下阻塞ALS算法是如何工作的。
交替最小二乘的分块实现首先将给定的评分矩阵按用户和项目分成块。对于这些块,计算路由信息。该路由信息分别说明哪个用户/项目块接收来自哪个项/用户块的输入。然后,启动ALS迭代。
因为Flink的底层执行引擎是并行流数据流引擎,所以它试图以流水线方式执行数据流的尽可能多的部分。这要求所有管道操作员同时在线。这具有Flink避免实现中间结果的优点,而中间结果可能会大得令人望而却步。缺点是可用内存必须在所有正在运行的操作符之间共享。在ALS中,单个DataSet元素(例如,用户/项块)的大小相当大,这是不需要的。
为了解决这个问题,如果设置了一个temporaryPath,并不是实现的所有操作符都同时执行。路径定义可以存储中间结果的位置。因此,如果定义了临时路径,那么ALS首先计算用户块的路由信息并将其写入磁盘,然后计算项块的路由信息并将其写入磁盘,最后但并非最不重要的是,它启动ALS迭代,从临时路径读取路由信息。
用户和项目块路由信息的计算都取决于给定的分级数据集。在您的情况下,当您计算用户路由信息时,它将首先读取评等数据集并在其上应用first运算符。first运算符从基础数据集中返回n-arbitrary元素。现在的问题是,Flink没有存储这个first操作的结果来计算项路由信息。相反,当您开始计算项路由信息时,Flink将从其源开始重新执行数据流。这意味着它从磁盘读取分级数据集,并再次将first运算符应用到磁盘上。与第一次first操作的结果相比,在许多情况下,这将为您提供一组不同的评级。因此,生成的路由信息不一致,ALS失败。
您可以通过将first操作符的结果具体化来规避这个问题,并将此结果用作ALS算法的输入。对象FlinkMLTools包含一个方法persist,它接受一个DataSet,将它写入给定的路径,然后返回一个新的DataSet,它读取刚刚写好的DataSet。这允许您分解生成的数据流图。
val firstTrainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.first((ratings.count()-1).toInt)
val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS/")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)或者,您可以尝试不设置temporaryPath。然后,所有的步骤(路由信息计算和als迭代)都以流水线方式执行。这意味着用户和项路由信息计算都使用来自first运算符的相同的输入数据集。
Flink社区目前正在努力将操作符的中间结果保存在内存中。这将允许将first运算符的结果固定在一起,这样它就不会被计算两次,因此,由于它的非确定性性质,不会给出不同的结果。
https://stackoverflow.com/questions/30939386
复制相似问题