我有一份记录清单。对于我的每一条记录,我需要进行一些繁重的计算,因为我在redis中创建了一个反向索引。对于reach记录,在一个管道中执行多个redis命令(100个sadd +1 set)。
我想并行化这个索引创建部分(使用joblib),但失败了。第一个问题是,我想将redis连接传递给每个作业,但这并不起作用,因为joblib想要序列化它,这是不起作用的。因此,只需提交主机/端口并让每个进程创建自己的连接即可。
def heavy_calc_insert(value, host, port)
r = redis.Redis(host=host, port=port)
#... some calc
pipe = r.pipeline()
for bit in bits:
key = "bit:" + str(bit)
pipe.sadd(key, idx_value)
pipe.set(idx_value, id)
pipe.execute()
Parallel(n_jobs=4)(delayed(heavy_calc_insert)(value, host, port) for value in values)但是,通过这段代码,我很快就得到了一个ConnectionError
ConnectionError: Connection closed by server.我想我遇到了有太多连接的一种形式。
我该如何解决这个问题?
发布于 2020-01-22 21:25:04
使用连接池。https://github.com/andymccurdy/redis-py/blob/master/README.rst#connection-pools
在heavy_calc_insert之外创建池,并传递池而不是主机和端口。
pool = redis.ConnectionPool(host=host, port=port, db=0)
...
def heavy_calc_insert(value, pool)
r = redis.Redis(connection_pool=pool)
...Redis有默认的10,000个连接限制,并且它不会关闭未使用的连接。
连接池将控制创建的连接,并为您提供更好的性能。
https://stackoverflow.com/questions/59858617
复制相似问题