首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Ruby的读写锁实现

Ruby的读写锁实现
EN

Code Review用户
提问于 2012-02-16 11:10:40
回答 3查看 2K关注 0票数 5

我最近回答了一个关于堆栈溢出的问题,有人问我如何使Hash线程安全。他们表示,性能很重要,阅读时会有很大的偏差。我建议使用读写锁,但找不到任何地方都可用的Ruby实现。我发现的唯一一件事是2007年的一篇博客文章,其中有人说他们建了一个,但是链接被打破了。

我认为Ruby的免费读写锁实现对社区来说是件好事,所以我试着想出一个。如果你能找到任何错误,或看到任何提高性能的方法,请评论!

这个文件的最新版本是这里。有一个分叉版本这里,它使排队的读者和作者相互交织,而不是先运行排队的作者。我还没决定这是不是个好主意。如果我认为这样做更好,我会将更改合并为“主人”。

代码语言:javascript
复制
# Ruby read-write lock implementation
# Allows any number of concurrent readers, but only one concurrent writer
# (And if the "write" lock is taken, any readers who come along will have to wait)

# If readers are already active when a writer comes along, the writer will wait for
#   all the readers to finish before going ahead
# But any additional readers who come when the writer is already waiting, will also
#   wait (so writers are not starved)

# Written by Alex Dowad
# Thanks to Doug Lea for java.util.concurrent.ReentrantReadWriteLock (used for inspiration)

# Usage:
# lock = ReadWriteLock.new
# lock.with_read_lock  { data.retrieve }
# lock.with_write_lock { data.modify! }

# Implementation note: A goal for this implementation is to make the "main" (uncontended)
#   path for readers lock-free
# Only if there is reader-writer or writer-writer contention, should locks be used

require 'atomic' # must install 'atomic' gem
require 'thread'

class ReadWriteLock
  def initialize
    @counter      = Atomic.new(0)         # single integer which represents lock state
                                          # 0 = free
                                          # +1 each concurrently running reader
                                          # +(1 << 16) for each waiting OR running writer
                                          # so @counter >= (1 << 16) means at least one writer is waiting/running
                                          # and (@counter & ((1 << 16)-1)) > 0 means at least one reader is running
    @reader_q     = ConditionVariable.new # queue for waiting readers
    @reader_mutex = Mutex.new             # to protect reader queue
    @writer_q     = ConditionVariable.new # queue for waiting writers
    @writer_mutex = Mutex.new             # to protect writer queue
  end

  WRITER_INCREMENT = 1 << 16              # must be a power of 2!
  MAX_READERS      = WRITER_INCREMENT - 1

  def with_read_lock
    while(true)
      c = @counter.value
      raise "Too many reader threads!" if (c & MAX_READERS) == MAX_READERS
      if c >= WRITER_INCREMENT
        @reader_mutex.synchronize do 
          @reader_q.wait(@reader_mutex) if @counter.value >= WRITER_INCREMENT
        end
      else
        break if @counter.compare_and_swap(c,c+1)
      end
    end

    yield

    while(true)
      c = @counter.value
      if @counter.compare_and_swap(c,c-1)
        if c >= WRITER_INCREMENT && (c & MAX_READERS) == 1
          @writer_mutex.synchronize { @writer_q.signal }     
        end
        break
      end
    end
  end

  def with_write_lock(&b)
    while(true)
      c = @counter.value
      if @counter.compare_and_swap(c,c+WRITER_INCREMENT)
        @writer_mutex.synchronize do
          @writer_q.wait(@writer_mutex) if @counter.value > 0
        end
        break
      end
    end

    yield

    while(true)
      c = @counter.value
      if @counter.compare_and_swap(c,c-WRITER_INCREMENT)
        if c-WRITER_INCREMENT >= WRITER_INCREMENT
          @writer_mutex.synchronize { @writer_q.signal }
        else
          @reader_mutex.synchronize { @reader_q.broadcast }
        end
        break
      end
    end
  end
end

if __FILE__ == $0

# for performance comparison with ReadWriteLock
class SimpleMutex
  def initialize; @mutex = Mutex.new; end
  def with_read_lock
    @mutex.synchronize { yield }
  end
  alias :with_write_lock :with_read_lock
end
# for seeing whether my correctness test is doing anything...
# and for seeing how great the overhead of the test is
# (apart from the cost of locking)
class FreeAndEasy
  def with_read_lock
    yield # thread safety is for the birds... I prefer to live dangerously
  end
  alias :with_write_lock :with_read_lock
end

require 'benchmark'

def test(lock, n_readers=20, n_writers=20, reader_iterations=50, writer_iterations=50, reader_sleep=0.001, writer_sleep=0.001)
  puts "Testing #{lock.class} with #{n_readers} readers and #{n_writers} writers. Readers iterate #{reader_iterations} times, sleeping #{reader_sleep}s each time, writers iterate #{writer_iterations} times, sleeping #{writer_sleep}s each time"
  mutex = Mutex.new
  bad   = false
  data  = 0

  result = Benchmark.measure do
    readers = n_readers.times.collect do
                Thread.new do
                  reader_iterations.times do
                    lock.with_read_lock do
                      mutex.synchronize { bad = true } if (data % 2) != 0
                      sleep(reader_sleep)
                      mutex.synchronize { bad = true } if (data % 2) != 0
                    end
                  end
                end
              end
    writers = n_writers.times.collect do
                Thread.new do
                  writer_iterations.times do
                    lock.with_write_lock do
                      value = data
                      data  = value+1
                      sleep(writer_sleep)
                      data  = value+1
                    end
                  end
                end
              end

    readers.each { |t| t.join }
    writers.each { |t| t.join }
    puts "BAD!!! Readers+writers overlapped!" if mutex.synchronize { bad }
    puts "BAD!!! Writers overlapped!" if data != (n_writers * writer_iterations * 2)
  end
  puts result
end

test(ReadWriteLock.new)
test(SimpleMutex.new)
test(FreeAndEasy.new)
end

顺便说一下,我的机器上的性能结果是:

  • 17.9s用于ReadWriteLock
  • 33.4s用于SimpleMutex
  • 0.8s用于FreeAndEasy

你的是什么?

EN

回答 3

Code Review用户

回答已采纳

发布于 2012-02-18 10:14:12

我不是多线程方面的专家,但我检查了代码,下面是我的想法:

以下代码导致死锁:

代码语言:javascript
复制
l = ReadWriteLock.new
l.with_write_lock { puts 'passed' }

这是因为编写者等待自己释放锁(它增加了@counter,然后等待如果@counter > 0

代码语言:javascript
复制
while(true)
  c = @counter.value
  if @counter.compare_and_swap(c,c+WRITER_INCREMENT)
    @writer_mutex.synchronize do
      @writer_q.wait(@writer_mutex) if @counter.value > 0 # here
    end
    break
  end
end

看起来,@counter的增量和递减是原子操作,但是change @counter + do operation不是原子可能的竞争条件,例如:

代码语言:javascript
复制
# let say we have 1 writer working and 1 reader tries to do the job:
while(true)
  c = @counter.value 
  raise "Too many reader threads!" if (c & MAX_READERS) == MAX_READERS
  if c >= WRITER_INCREMENT # here @counter == WRITER_INCREMENT
    @reader_mutex.synchronize do
      if @counter.value >= WRITER_INCREMENT # here @counter == WRITER_INCREMENT
        # <- but here writer decrements WRITER_INCREMENT and does @reader_q.broadcast  (there is no synchronization to protect from this)
        @reader_q.wait(@reader_mutex) # then we wait until writer will broadcast, but it will not happen -> deadlock
      end
    end
  else
    break if @counter.compare_and_swap(c,c+1)
  end
end

在你的基准代码中,我相信你想写这样的东西

代码语言:javascript
复制
lock.with_write_lock do
  value = data + 1
  sleep(writer_sleep)
  data  = value + 1
end

而不是

代码语言:javascript
复制
lock.with_write_lock do
  value = data
  data = value + 1
  sleep(writer_sleep)
  data  = value + 1 # 2 times assign `value + 1` to `data`
end

我认为对实现进行性能优化为时尚早,但以下是Intel(R) Core(TM)2 Duo CPU P7570 @ 2.26GHz机器基准测试的结果:

代码语言:javascript
复制
# MRI 1.9.2 (with global interpreter lock)
  0.090000   0.140000   0.230000 (  1.419659)
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.050000   0.090000   0.140000 (  2.356187)
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Writers overlapped!
  0.010000   0.030000   0.040000 (  0.063878)

# MRI 1.9.3 (with global interpreter lock)
  0.110000   0.100000   0.210000 (  1.405219)
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.030000   0.070000   0.100000 (  2.292269)
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Writers overlapped!
  0.010000   0.010000   0.020000 (  0.076124)

# Jruby 1.6.5 (with native threads)
  1.783000   0.000000   1.783000 (  1.783000)
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  2.484000   0.000000   2.484000 (  2.484000)
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Writers overlapped!
  0.151000   0.000000   0.151000 (  0.151000)

注意: MRI基准测试经常因deadlock detected错误(或挂起)而失败。

Sidenote:我认为把这个项目放到github中是个好主意,这样其他人就可以参与拉请求/bug报告了。我将参加:)

修复

后的

更新

我在修复之后运行了基准测试,下面是结果(我多次运行它,没有遇到死锁错误):

代码语言:javascript
复制
READ INTENSIVE (80% read, 20% write):
Testing ReadWriteLock with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.070000   0.080000   0.150000 (  0.557659)
WRITE INTENSIVE (80% write, 20% read):
Testing ReadWriteLock with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.140000   0.080000   0.220000 (  2.015721)
BALANCED (50% read, 50% write):
Testing ReadWriteLock with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.090000   0.080000   0.170000 (  1.286458)
READ INTENSIVE (80% read, 20% write):
Testing SimpleMutex with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.080000   0.070000   0.150000 (  2.401325)
WRITE INTENSIVE (80% write, 20% read):
Testing SimpleMutex with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.050000   0.090000   0.140000 (  2.361558)
BALANCED (50% read, 50% write):
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.070000   0.080000   0.150000 (  2.357656)
READ INTENSIVE (80% read, 20% write):
Testing FreeAndEasy with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.030000   0.040000   0.070000 (  0.072458)
WRITE INTENSIVE (80% write, 20% read):
Testing FreeAndEasy with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.010000   0.050000   0.060000 (  0.079889)
BALANCED (50% read, 50% write):
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.030000   0.030000   0.060000 (  0.072672)
票数 2
EN

Code Review用户

发布于 2013-02-12 15:06:48

这里有一个与您类似的简单实现,它不需要添加任何额外的宝石(比如原子)--它使用一个队列来轻松地处理编写器/读取器的信令,尽管我认为使用ConditionVariable可以更快地完成这一任务。在我的情况下,不使用额外的宝石是很重要的,因为我需要能够很容易地分发给那些不是红宝石的人。

它基于以下伪代码:http://en.wikipedia.org/wiki/Readers-writers_问题

虽然速度稍慢,但似乎并没有受到多位作家的惩罚。我想知道它如何在其他系统上执行,而不是您的实现。我相信它是正确的,它不会显示任何错误,根据您的测试框架。

代码语言:javascript
复制
class LimitWriters
    def initialize
        @noWaiting = Mutex.new
        # Users are either a writer or any number of readers
        @users = Queue.new
        @users.push(true)
        @readers = 0
        @readersSem = Mutex.new
    end
    def with_write_lock
        @noWaiting.lock
        @users.pop
        @noWaiting.unlock
        yield
        @users.push(true)
    end
    def with_read_lock
        prev,curr = nil,nil
        @noWaiting.lock
        @readersSem.synchronize {
            prev = @readers
            @readers += 1
        }
        @users.pop if prev==0
        @noWaiting.unlock
        yield
        @readersSem.synchronize {
            @readers -= 1
            curr = @readers
        }
        @users.push(true) if curr==0
    end
end
票数 2
EN

Code Review用户

发布于 2012-02-21 00:54:12

关于github版本3d9881516083a8331e4bd0e03a53696a47cb2a08:

这种模式看起来不安全(或者至少注释是错误的):

代码语言:javascript
复制
@writer_mutex.synchronize do
   # So we have to do another check inside the synchronized section
   # If a writer OR reader is running, then go to sleep
   c = @counter.value
   @writer_q.wait(@writer_mutex) if (c >= RUNNING_WRITER) || ((c & MAX_READERS) > 0)
end

你没有一个互斥保护@计数器,你正在改变它左右。所以@计数器在你检查c和睡觉之间会发生变化。我还没有对它进行足够的分析,以判断它是否真的是一个问题。

此外,此代码还存在严重的饥饿问题。尝试打印"w“和"r”的每一个迭代的读者和作家线程。读者只能在作者完成后才能做一些工作。

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/9038

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档