我最近回答了一个关于堆栈溢出的问题,有人问我如何使Hash线程安全。他们表示,性能很重要,阅读时会有很大的偏差。我建议使用读写锁,但找不到任何地方都可用的Ruby实现。我发现的唯一一件事是2007年的一篇博客文章,其中有人说他们建了一个,但是链接被打破了。
我认为Ruby的免费读写锁实现对社区来说是件好事,所以我试着想出一个。如果你能找到任何错误,或看到任何提高性能的方法,请评论!
这个文件的最新版本是这里。有一个分叉版本这里,它使排队的读者和作者相互交织,而不是先运行排队的作者。我还没决定这是不是个好主意。如果我认为这样做更好,我会将更改合并为“主人”。
# Ruby read-write lock implementation
# Allows any number of concurrent readers, but only one concurrent writer
# (And if the "write" lock is taken, any readers who come along will have to wait)
# If readers are already active when a writer comes along, the writer will wait for
# all the readers to finish before going ahead
# But any additional readers who come when the writer is already waiting, will also
# wait (so writers are not starved)
# Written by Alex Dowad
# Thanks to Doug Lea for java.util.concurrent.ReentrantReadWriteLock (used for inspiration)
# Usage:
# lock = ReadWriteLock.new
# lock.with_read_lock { data.retrieve }
# lock.with_write_lock { data.modify! }
# Implementation note: A goal for this implementation is to make the "main" (uncontended)
# path for readers lock-free
# Only if there is reader-writer or writer-writer contention, should locks be used
require 'atomic' # must install 'atomic' gem
require 'thread'
class ReadWriteLock
def initialize
@counter = Atomic.new(0) # single integer which represents lock state
# 0 = free
# +1 each concurrently running reader
# +(1 << 16) for each waiting OR running writer
# so @counter >= (1 << 16) means at least one writer is waiting/running
# and (@counter & ((1 << 16)-1)) > 0 means at least one reader is running
@reader_q = ConditionVariable.new # queue for waiting readers
@reader_mutex = Mutex.new # to protect reader queue
@writer_q = ConditionVariable.new # queue for waiting writers
@writer_mutex = Mutex.new # to protect writer queue
end
WRITER_INCREMENT = 1 << 16 # must be a power of 2!
MAX_READERS = WRITER_INCREMENT - 1
def with_read_lock
while(true)
c = @counter.value
raise "Too many reader threads!" if (c & MAX_READERS) == MAX_READERS
if c >= WRITER_INCREMENT
@reader_mutex.synchronize do
@reader_q.wait(@reader_mutex) if @counter.value >= WRITER_INCREMENT
end
else
break if @counter.compare_and_swap(c,c+1)
end
end
yield
while(true)
c = @counter.value
if @counter.compare_and_swap(c,c-1)
if c >= WRITER_INCREMENT && (c & MAX_READERS) == 1
@writer_mutex.synchronize { @writer_q.signal }
end
break
end
end
end
def with_write_lock(&b)
while(true)
c = @counter.value
if @counter.compare_and_swap(c,c+WRITER_INCREMENT)
@writer_mutex.synchronize do
@writer_q.wait(@writer_mutex) if @counter.value > 0
end
break
end
end
yield
while(true)
c = @counter.value
if @counter.compare_and_swap(c,c-WRITER_INCREMENT)
if c-WRITER_INCREMENT >= WRITER_INCREMENT
@writer_mutex.synchronize { @writer_q.signal }
else
@reader_mutex.synchronize { @reader_q.broadcast }
end
break
end
end
end
end
if __FILE__ == $0
# for performance comparison with ReadWriteLock
class SimpleMutex
def initialize; @mutex = Mutex.new; end
def with_read_lock
@mutex.synchronize { yield }
end
alias :with_write_lock :with_read_lock
end
# for seeing whether my correctness test is doing anything...
# and for seeing how great the overhead of the test is
# (apart from the cost of locking)
class FreeAndEasy
def with_read_lock
yield # thread safety is for the birds... I prefer to live dangerously
end
alias :with_write_lock :with_read_lock
end
require 'benchmark'
def test(lock, n_readers=20, n_writers=20, reader_iterations=50, writer_iterations=50, reader_sleep=0.001, writer_sleep=0.001)
puts "Testing #{lock.class} with #{n_readers} readers and #{n_writers} writers. Readers iterate #{reader_iterations} times, sleeping #{reader_sleep}s each time, writers iterate #{writer_iterations} times, sleeping #{writer_sleep}s each time"
mutex = Mutex.new
bad = false
data = 0
result = Benchmark.measure do
readers = n_readers.times.collect do
Thread.new do
reader_iterations.times do
lock.with_read_lock do
mutex.synchronize { bad = true } if (data % 2) != 0
sleep(reader_sleep)
mutex.synchronize { bad = true } if (data % 2) != 0
end
end
end
end
writers = n_writers.times.collect do
Thread.new do
writer_iterations.times do
lock.with_write_lock do
value = data
data = value+1
sleep(writer_sleep)
data = value+1
end
end
end
end
readers.each { |t| t.join }
writers.each { |t| t.join }
puts "BAD!!! Readers+writers overlapped!" if mutex.synchronize { bad }
puts "BAD!!! Writers overlapped!" if data != (n_writers * writer_iterations * 2)
end
puts result
end
test(ReadWriteLock.new)
test(SimpleMutex.new)
test(FreeAndEasy.new)
end顺便说一下,我的机器上的性能结果是:
ReadWriteLockSimpleMutexFreeAndEasy你的是什么?
发布于 2012-02-18 10:14:12
我不是多线程方面的专家,但我检查了代码,下面是我的想法:
以下代码导致死锁:
l = ReadWriteLock.new
l.with_write_lock { puts 'passed' }这是因为编写者等待自己释放锁(它增加了@counter,然后等待如果@counter > 0:
while(true)
c = @counter.value
if @counter.compare_and_swap(c,c+WRITER_INCREMENT)
@writer_mutex.synchronize do
@writer_q.wait(@writer_mutex) if @counter.value > 0 # here
end
break
end
end看起来,@counter的增量和递减是原子操作,但是change @counter + do operation不是原子可能的竞争条件,例如:
# let say we have 1 writer working and 1 reader tries to do the job:
while(true)
c = @counter.value
raise "Too many reader threads!" if (c & MAX_READERS) == MAX_READERS
if c >= WRITER_INCREMENT # here @counter == WRITER_INCREMENT
@reader_mutex.synchronize do
if @counter.value >= WRITER_INCREMENT # here @counter == WRITER_INCREMENT
# <- but here writer decrements WRITER_INCREMENT and does @reader_q.broadcast (there is no synchronization to protect from this)
@reader_q.wait(@reader_mutex) # then we wait until writer will broadcast, but it will not happen -> deadlock
end
end
else
break if @counter.compare_and_swap(c,c+1)
end
end在你的基准代码中,我相信你想写这样的东西
lock.with_write_lock do
value = data + 1
sleep(writer_sleep)
data = value + 1
end而不是
lock.with_write_lock do
value = data
data = value + 1
sleep(writer_sleep)
data = value + 1 # 2 times assign `value + 1` to `data`
end我认为对实现进行性能优化为时尚早,但以下是Intel(R) Core(TM)2 Duo CPU P7570 @ 2.26GHz机器基准测试的结果:
# MRI 1.9.2 (with global interpreter lock)
0.090000 0.140000 0.230000 ( 1.419659)
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
0.050000 0.090000 0.140000 ( 2.356187)
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Writers overlapped!
0.010000 0.030000 0.040000 ( 0.063878)
# MRI 1.9.3 (with global interpreter lock)
0.110000 0.100000 0.210000 ( 1.405219)
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
0.030000 0.070000 0.100000 ( 2.292269)
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Writers overlapped!
0.010000 0.010000 0.020000 ( 0.076124)
# Jruby 1.6.5 (with native threads)
1.783000 0.000000 1.783000 ( 1.783000)
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
2.484000 0.000000 2.484000 ( 2.484000)
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Writers overlapped!
0.151000 0.000000 0.151000 ( 0.151000)注意: MRI基准测试经常因deadlock detected错误(或挂起)而失败。
Sidenote:我认为把这个项目放到github中是个好主意,这样其他人就可以参与拉请求/bug报告了。我将参加:)
修复
更新
我在修复之后运行了基准测试,下面是结果(我多次运行它,没有遇到死锁错误):
READ INTENSIVE (80% read, 20% write):
Testing ReadWriteLock with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
0.070000 0.080000 0.150000 ( 0.557659)
WRITE INTENSIVE (80% write, 20% read):
Testing ReadWriteLock with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
0.140000 0.080000 0.220000 ( 2.015721)
BALANCED (50% read, 50% write):
Testing ReadWriteLock with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
0.090000 0.080000 0.170000 ( 1.286458)
READ INTENSIVE (80% read, 20% write):
Testing SimpleMutex with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
0.080000 0.070000 0.150000 ( 2.401325)
WRITE INTENSIVE (80% write, 20% read):
Testing SimpleMutex with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
0.050000 0.090000 0.140000 ( 2.361558)
BALANCED (50% read, 50% write):
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
0.070000 0.080000 0.150000 ( 2.357656)
READ INTENSIVE (80% read, 20% write):
Testing FreeAndEasy with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
0.030000 0.040000 0.070000 ( 0.072458)
WRITE INTENSIVE (80% write, 20% read):
Testing FreeAndEasy with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
0.010000 0.050000 0.060000 ( 0.079889)
BALANCED (50% read, 50% write):
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
0.030000 0.030000 0.060000 ( 0.072672)发布于 2013-02-12 15:06:48
这里有一个与您类似的简单实现,它不需要添加任何额外的宝石(比如原子)--它使用一个队列来轻松地处理编写器/读取器的信令,尽管我认为使用ConditionVariable可以更快地完成这一任务。在我的情况下,不使用额外的宝石是很重要的,因为我需要能够很容易地分发给那些不是红宝石的人。
它基于以下伪代码:http://en.wikipedia.org/wiki/Readers-writers_问题
虽然速度稍慢,但似乎并没有受到多位作家的惩罚。我想知道它如何在其他系统上执行,而不是您的实现。我相信它是正确的,它不会显示任何错误,根据您的测试框架。
class LimitWriters
def initialize
@noWaiting = Mutex.new
# Users are either a writer or any number of readers
@users = Queue.new
@users.push(true)
@readers = 0
@readersSem = Mutex.new
end
def with_write_lock
@noWaiting.lock
@users.pop
@noWaiting.unlock
yield
@users.push(true)
end
def with_read_lock
prev,curr = nil,nil
@noWaiting.lock
@readersSem.synchronize {
prev = @readers
@readers += 1
}
@users.pop if prev==0
@noWaiting.unlock
yield
@readersSem.synchronize {
@readers -= 1
curr = @readers
}
@users.push(true) if curr==0
end
end发布于 2012-02-21 00:54:12
关于github版本3d9881516083a8331e4bd0e03a53696a47cb2a08:
这种模式看起来不安全(或者至少注释是错误的):
@writer_mutex.synchronize do
# So we have to do another check inside the synchronized section
# If a writer OR reader is running, then go to sleep
c = @counter.value
@writer_q.wait(@writer_mutex) if (c >= RUNNING_WRITER) || ((c & MAX_READERS) > 0)
end你没有一个互斥保护@计数器,你正在改变它左右。所以@计数器在你检查c和睡觉之间会发生变化。我还没有对它进行足够的分析,以判断它是否真的是一个问题。
此外,此代码还存在严重的饥饿问题。尝试打印"w“和"r”的每一个迭代的读者和作家线程。读者只能在作者完成后才能做一些工作。
https://codereview.stackexchange.com/questions/9038
复制相似问题