我有一个批量导入的要求。文件可以包含数千条记录,并且每条记录都需要验证。用户希望收到有多少条记录无效的通知。最初,我使用Ruby的Mutex和Redis的发布/订阅实现了这一点。请注意,我有20个并发线程通过Sidekiq处理每个记录:
class Record < ActiveRecord::Base
class << self
# invalidated_records is SHARED memory for the Sidekiq worker threads
attr_accessor :invalidated_records
attr_accessor :semaphore
end
def self.batch_import
self.semaphore = Mutex.new
self.invalid_records = []
redis.subscribe_with_timeout(180, 'validation_update') do |on|
on.message do |channel, message|
if message.to_s =~ /\d+|import_.+/
self.semaphore.synchronize {
self.invalidated_records << message
}
elsif message == 'exit'
redis.unsubscribe
end
end
end
end
endSidekiq将发布到Record对象:
Redis.current.publish 'validation_update', 'import_invalid_address'问题是发生了一些奇怪的事情。Record.invalidated_records中不会填充所有无效的导入。他们中的许多人都是,但不是全部。我认为这是因为多个线程试图并发地更新对象,它污染了对象。我认为互斥锁可以解决这个问题。但是在添加互斥锁之后,并不是所有的无效都会在Record.invalidated_records中被填充。
最后,我使用redis原子递减和递增来跟踪无效的导入,这非常有效。但是我很好奇Ruby Mutex和试图更新Record.invalidated_records的多线程有什么问题?
发布于 2018-04-14 13:33:40
我没有使用过互斥锁,但我认为发生的情况是线程看到信号量被锁定并跳过保存<<消息,需要使用https://apidock.com/ruby/ConditionVariable等待互斥锁解锁,然后保存数据
https://stackoverflow.com/questions/49826254
复制相似问题