根据我的previous question与杰迪斯和线程。我将代码更改为使用JedisPool而不是Jedis。但是,随着线程的增加,仍然程序停止运行。我试图增加.setMaxIdle(8000)和.setMaxTotal(8000),并将其修复为临时的,但后来在其他运行中,经过一些迭代之后,它再次卡住了。我猜想由于池中缺少连接(我关闭了它们),但是线程似乎没有释放连接。
下面是我的连接的更新版本:
import redis.clients.jedis.{JedisPool, JedisPoolConfig}
object redisOp{
@transient lazy val log: Logger = org.apache.log4j.LogManager.getLogger("myLogger")
def apply(set: RDD[Int]): Unit = {
val spark = SparkConstructor()
val sc = spark.sparkContext
// initialize Parents and Ranks key-values
val parents = set.map(i => ("p"+i, i.toString))
val ranks = set.map(i => ("r"+i, 1.toString))
sc.toRedisKV(parents) // using spark-redis packege here only, ignore it.
sc.toRedisKV(ranks)
log.warn("***Initialized Redis***")
}
val jedisConfig = new JedisPoolConfig() // Check from here (object's values and variables)
jedisConfig.setMaxIdle(8000) //TODO: a better configuration?
jedisConfig.setMaxTotal(8000)
lazy val pool = new JedisPool(jedisConfig, "localhost")
def find(u: Long): Option[Long] = { // returns leader of the set containing u
val r = pool.getResource
val res = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) {
Some(u)
} else find(p.toLong))
r.close() // closing back to pool
res
}
// other methods are similar to find()...
}发布于 2021-07-19 05:49:33
问题在于递归的实现。在不释放资源的情况下调用下一个递归堆栈。因此,在某种程度上,最新的堆栈是在稀缺的资源,因为旧的堆栈保存他们的资源。
因此,在调用下一个递归堆栈之前释放资源。
例如。
def find(u: Long): Option[Long] = { // returns leader of the set containing u
val r = pool.getResource
val rget = r.get(s"p$u")
r.close() // closing back to pool
val res = Option(rget).flatMap(p => if (p.toLong == u) {
Some(u)
} else find(p.toLong))
res
}https://stackoverflow.com/questions/68432692
复制相似问题