首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >新版本Ruby的读写锁实现

新版本Ruby的读写锁实现
EN

Code Review用户
提问于 2012-02-20 17:03:27
回答 1查看 1.1K关注 0票数 2

最近我发现在互联网上没有Ruby的免费读写锁实现,所以我自己构建了一个。这样做的目的是将其无限期地发布在GitHub上,供整个Ruby社区使用。

代码在https://github.com/alexdowad/showcase/blob/master/ruby-threads/read_写_lock.rb。为了方便起见,还在下面重复一遍。(为了避免读者饥饿,还有另外一个版本;它目前在https://github.com/alexdowad/showcase/blob/fair-to-readers/ruby-threads/read_写_lock.rb上。)

非常友好地回顾了第一个版本,发现了一个严重的错误。为了修复这个bug,我不得不重新考虑实现,并做一些重大的更改。我希望有人能够审查新版本,首先是正确性,然后是任何提高性能的方法。此外,如果您有一个多核处理器,请尝试运行该文件(它有一个内置测试脚本)。没有出现在我的单核机器上的问题可能更容易出现在多个核上。

代码语言:javascript
复制
# Ruby read-write lock implementation
# Allows any number of concurrent readers, but only one concurrent writer
# (And if the "write" lock is taken, any readers who come along will have to wait)

# If readers are already active when a writer comes along, the writer will wait for
#   all the readers to finish before going ahead
# But any additional readers who come when the writer is already waiting, will also
#   wait (so writers are not starved)

# Written by Alex Dowad
# Bug fixes contributed by Alex Kliuchnikau
# Thanks to Doug Lea for java.util.concurrent.ReentrantReadWriteLock (used for inspiration)

# Usage:
# lock = ReadWriteLock.new
# lock.with_read_lock  { data.retrieve }
# lock.with_write_lock { data.modify! }

# Implementation notes: 
# A goal is to make the uncontended path for both readers/writers lock-free
# Only if there is reader-writer or writer-writer contention, should locks be used
# Internal state is represented by a single integer ("counter"), and updated 
#  using atomic compare-and-swap operations
# When the counter is 0, the lock is free
# Each reader increments the counter by 1 when acquiring a read lock
#   (and decrements by 1 when releasing the read lock)
# The counter is increased by (1 << 15) for each writer waiting to acquire the
#   write lock, and by (1 << 30) if the write lock is taken

require 'atomic'
require 'thread'

class ReadWriteLock
  def initialize
    @counter      = Atomic.new(0)         # single integer which represents lock state
    @reader_q     = ConditionVariable.new # queue for waiting readers
    @reader_mutex = Mutex.new             # to protect reader queue
    @writer_q     = ConditionVariable.new # queue for waiting writers
    @writer_mutex = Mutex.new             # to protect writer queue
  end

  WAITING_WRITER  = 1 << 15
  RUNNING_WRITER  = 1 << 30
  MAX_READERS     = WAITING_WRITER - 1
  MAX_WRITERS     = RUNNING_WRITER - MAX_READERS - 1

  def with_read_lock
    acquire_read_lock
    yield
    release_read_lock
  end

  def with_write_lock
    acquire_write_lock
    yield
    release_write_lock
  end

  def acquire_read_lock
    while(true)
      c = @counter.value
      raise "Too many reader threads!" if (c & MAX_READERS) == MAX_READERS

      # If a writer is running OR waiting, we need to wait
      if c >= WAITING_WRITER
        # But it is possible that the writer could finish and decrement @counter right here...
        @reader_mutex.synchronize do 
          # So check again inside the synchronized section
          @reader_q.wait(@reader_mutex) if @counter.value >= WAITING_WRITER
        end
      else
        break if @counter.compare_and_swap(c,c+1)
      end
    end    
  end

  def release_read_lock
    while(true)
      c = @counter.value
      if @counter.compare_and_swap(c,c-1)
        # If one or more writers were waiting, and we were the last reader, wake a writer up
        if c >= WAITING_WRITER && (c & MAX_READERS) == 1
          @writer_mutex.synchronize { @writer_q.signal }
        end
        break
      end
    end
  end

  def acquire_write_lock
    while(true)
      c = @counter.value
      raise "Too many writers!" if (c & MAX_WRITERS) == MAX_WRITERS

      if c == 0 # no readers OR writers running
        # if we successfully swap the RUNNING_WRITER bit on, then we can go ahead
        break if @counter.compare_and_swap(0,RUNNING_WRITER)
      elsif @counter.compare_and_swap(c,c+WAITING_WRITER)
        while(true)
          # Now we have successfully incremented, so no more readers will be able to increment
          #   (they will wait instead)
          # However, readers OR writers could decrement right here, OR another writer could increment
          @writer_mutex.synchronize do
            # So we have to do another check inside the synchronized section
            # If a writer OR reader is running, then go to sleep
            c = @counter.value
            @writer_q.wait(@writer_mutex) if (c >= RUNNING_WRITER) || ((c & MAX_READERS) > 0)
          end

          # We just came out of a wait
          # If we successfully turn the RUNNING_WRITER bit on with an atomic swap,
          # Then we are OK to stop waiting and go ahead
          # Otherwise go back and wait again
          c = @counter.value
          break if (c < RUNNING_WRITER) && @counter.compare_and_swap(c,c+RUNNING_WRITER-WAITING_WRITER)
        end
        break
      end
    end
  end

  def release_write_lock
    while(true)
      c = @counter.value
      if @counter.compare_and_swap(c,c-RUNNING_WRITER)
        if (c & MAX_WRITERS) > 0 # if any writers are waiting...
          @writer_mutex.synchronize { @writer_q.signal }
        else
          @reader_mutex.synchronize { @reader_q.broadcast }
        end
        break
      end
    end
  end
end

if __FILE__ == $0

# for performance comparison with ReadWriteLock
class SimpleMutex
  def initialize; @mutex = Mutex.new; end
  def with_read_lock
    @mutex.synchronize { yield }
  end
  alias :with_write_lock :with_read_lock
end

# for seeing whether my correctness test is doing anything...
# and for seeing how great the overhead of the test is
# (apart from the cost of locking)
class FreeAndEasy
  def with_read_lock
    yield # thread safety is for the birds... I prefer to live dangerously
  end
  alias :with_write_lock :with_read_lock
end

require 'benchmark'

TOTAL_THREADS = 40 # set this number as high as practicable!

def test(lock)
  puts "READ INTENSIVE (80% read, 20% write):"
  single_test(lock, (TOTAL_THREADS * 0.8).floor, (TOTAL_THREADS * 0.2).floor)
  puts "WRITE INTENSIVE (80% write, 20% read):"
  single_test(lock, (TOTAL_THREADS * 0.2).floor, (TOTAL_THREADS * 0.8).floor)
  puts "BALANCED (50% read, 50% write):"
  single_test(lock, (TOTAL_THREADS * 0.5).floor, (TOTAL_THREADS * 0.5).floor)
end

def single_test(lock, n_readers, n_writers, reader_iterations=50, writer_iterations=50, reader_sleep=0.001, writer_sleep=0.001)
  puts "Testing #{lock.class} with #{n_readers} readers and #{n_writers} writers. Readers iterate #{reader_iterations} times, sleeping #{reader_sleep}s each time, writers iterate #{writer_iterations} times, sleeping #{writer_sleep}s each time"
  mutex = Mutex.new
  bad   = false
  data  = 0

  result = Benchmark.measure do
    readers = n_readers.times.collect do
                Thread.new do
                  reader_iterations.times do
                    lock.with_read_lock do
                      mutex.synchronize { bad = true } if (data % 2) != 0
                      sleep(reader_sleep)
                      mutex.synchronize { bad = true } if (data % 2) != 0
                    end
                  end
                end
              end
    writers = n_writers.times.collect do
                Thread.new do
                  writer_iterations.times do
                    lock.with_write_lock do
                      # invariant: other threads should NEVER see "data" as an odd number
                      value = (data += 1)
                      # if a reader runs right now, this invariant will be violated
                      sleep(writer_sleep)
                      # this looks like a strange way to increment twice;
                      # it's designed so that if 2 writers run at the same time, at least
                      #   one increment will be lost, and we can detect that at the end
                      data  = value+1 
                    end
                  end
                end
              end

    readers.each { |t| t.join }
    writers.each { |t| t.join }
    puts "BAD!!! Readers+writers overlapped!" if mutex.synchronize { bad }
    puts "BAD!!! Writers overlapped!" if data != (n_writers * writer_iterations * 2)
  end
  puts result
end

test(ReadWriteLock.new)
test(SimpleMutex.new)
test(FreeAndEasy.new)
end
EN

回答 1

Code Review用户

回答已采纳

发布于 2012-02-29 09:44:08

您是否考虑过将读取器和写入器队列加入到单个编写器等待队列中?这可能使代码更容易同步,并在执行常量写入时解决与读者饥饿的问题,如下所示:

  1. 如果采取写锁或队列不为空,则新操作(读或写)进入队列。
  2. 如果没有获得写入锁,队列为空,则执行新的读取操作。新的写操作进入队列,等待读操作结束。
  3. 释放写锁时,从队列中运行下一个操作。如果这是读操作,请尝试运行下一个操作,直到遇到写操作/队列为空为止。如果启用了写入操作,则等待读取(见第2项)。

我认为您应该将计数器上的操作封装成单独的,比如ReadWriteCounter,抽象。它将在内部保存Atomic @counter实例,并通过new_writerwriter_running?等方法执行检查和操作。这将提高代码的可读性。

我在奔腾(R)双核CPU E5400 @ 2.70GHz机器上运行代码:

MRI Ruby 1.9.2-p290:

代码语言:javascript
复制
READ INTENSIVE (80% read, 20% write):
Testing ReadWriteLock with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.030000   0.030000   0.060000 (  0.499945)
WRITE INTENSIVE (80% write, 20% read):
Testing ReadWriteLock with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.030000   0.010000   0.040000 (  1.761913)
BALANCED (50% read, 50% write):
Testing ReadWriteLock with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.030000   0.010000   0.040000 (  1.121450)
READ INTENSIVE (80% read, 20% write):
Testing SimpleMutex with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.010000   0.020000   0.030000 (  2.125943)
WRITE INTENSIVE (80% write, 20% read):
Testing SimpleMutex with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.010000   0.010000   0.020000 (  2.114639)
BALANCED (50% read, 50% write):
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.010000   0.010000   0.020000 (  2.114838)
READ INTENSIVE (80% read, 20% write):
Testing FreeAndEasy with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.000000   0.000000   0.000000 (  0.058086)
WRITE INTENSIVE (80% write, 20% read):
Testing FreeAndEasy with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.010000   0.010000   0.020000 (  0.060335)
BALANCED (50% read, 50% write):
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.000000   0.000000   0.000000 (  0.057053)

Jruby 1.6.5.1:

代码语言:javascript
复制
READ INTENSIVE (80% read, 20% write):
Testing ReadWriteLock with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  0.875000   0.000000   0.875000 (  0.875000)
WRITE INTENSIVE (80% write, 20% read):
Testing ReadWriteLock with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  1.880000   0.000000   1.880000 (  1.880000)
BALANCED (50% read, 50% write):
Testing ReadWriteLock with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  1.201000   0.000000   1.201000 (  1.201000)
READ INTENSIVE (80% read, 20% write):
Testing SimpleMutex with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  2.196000   0.000000   2.196000 (  2.196000)
WRITE INTENSIVE (80% write, 20% read):
Testing SimpleMutex with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  2.219000   0.000000   2.219000 (  2.219000)
BALANCED (50% read, 50% write):
Testing SimpleMutex with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
  2.229000   0.000000   2.229000 (  2.229000)
READ INTENSIVE (80% read, 20% write):
Testing FreeAndEasy with 32 readers and 8 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.074000   0.000000   0.074000 (  0.074000)
WRITE INTENSIVE (80% write, 20% read):
Testing FreeAndEasy with 8 readers and 32 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.060000   0.000000   0.060000 (  0.060000)
BALANCED (50% read, 50% write):
Testing FreeAndEasy with 20 readers and 20 writers. Readers iterate 50 times, sleeping 0.001s each time, writers iterate 50 times, sleeping 0.001s each time
BAD!!! Readers+writers overlapped!
BAD!!! Writers overlapped!
  0.105000   0.000000   0.105000 (  0.104000)
票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/9186

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档