请看下面的代码。这是我理解并发应用程序的尝试。
class Program
{
static void Main(string[] args)
{
var consumers = new List<Consumer>
{
new Consumer(1000),
new Consumer(900),
new Consumer(800),
new Consumer(700),
new Consumer(600),
};
var resourceManager = new ResourceManager(consumers);
resourceManager.Process();
}
}
public class Resource
{
private int Capacity { get; set; } = 1000;
private Guid? _currentConsumer;
public int GetCapacity(Guid? id)
{
while (id.HasValue && _currentConsumer.HasValue && id != _currentConsumer)
{
Thread.Sleep(5);
}
_currentConsumer = id;
return Capacity;
}
public void SetCapacity(int cap, Guid id)
{
if (_currentConsumer.HasValue && id != _currentConsumer)
return;
Capacity = cap;
_currentConsumer = null;
}
}
public class Consumer
{
private readonly int _sleep;
private Guid _id = Guid.NewGuid();
public Consumer(int sleep)
{
_sleep = sleep;
}
public void ConsumeResource(Resource resource)
{
var capture = resource.GetCapacity(_id);
Thread.Sleep(_sleep); // some calsulations and stuff
if (resource.GetCapacity(_id) != capture)
throw new SystemException("Something went wrong");
resource.SetCapacity(resource.GetCapacity(_id) - 1, _id);
}
}
public class ResourceManager
{
private readonly List<Consumer> _consumers;
public readonly Resource _resource;
public ResourceManager(List<Consumer> consumers)
{
_consumers = consumers;
_resource = new Resource();
}
public void Process()
{
Parallel.For(0, _consumers.Count, i =>
{
var consumer = _consumers[i];
consumer.ConsumeResource(_resource);
});
}
}来自Consumer::ConsumeResource的代码行
Thread.Sleep(_sleep); // some calsulations and stuff
if (resource.GetCapacity(_id) != capture)
throw new SystemException("Something went wrong");用于模拟实际的应用程序情况,当有几个使用者可能使用相同的资源,而且如果资源的状态在计算过程中发生变化(可能是由另一个使用者),那么该计算可能会中断。
为此,我创建了一个解决方案,使用好的旧Thread.Sleep,因此在另一个使用者已经使用它时锁定资源,但是我觉得有更正确的方法来实现同样的目标。
期待你的复习笔记!
在对locks等人做了一些研究之后,我写了一个小帮助者类:
public class ConcurrentAccessProvider<TObject>
{
private readonly Func<TObject> _getter;
private readonly Action<TObject> _setter;
private readonly object _lock = new object();
public ConcurrentAccessProvider(Func<TObject> getter, Action<TObject> setter)
{
_getter = getter;
_setter = setter;
}
public TObject Get()
{
lock (_lock)
{
return _getter();
}
}
public void Set(TObject value)
{
lock (_lock)
{
_setter(value);
}
}
public void Access(Action accessAction)
{
lock (_lock)
{
accessAction();
}
}
}使用该方法,我重写了Resource和Consumer,以使其线程安全:
public class Resource
{
public ConcurrentAccessProvider<int> CapacityAccessProvider { get; }
private int _capacity;
public Resource()
{
CapacityAccessProvider = new ConcurrentAccessProvider<int>(() => _capacity, val => _capacity = val);
}
public int Capacity
{
get => CapacityAccessProvider.Get();
set => CapacityAccessProvider.Set(value);
}
}
public class Consumer
{
private readonly int _sleep;
public Consumer(int sleep)
{
_sleep = sleep;
}
public void ConsumeResource(Resource resource)
{
resource.CapacityAccessProvider.Access(() =>
{
var capture = resource.Capacity;
Thread.Sleep(_sleep); // some calsulations and stuff
if (resource.Capacity != capture)
throw new SystemException("Something went wrong");
resource.Capacity -= 1;
Console.WriteLine(resource.Capacity);
});
}
}在提供的示例中,这些操作有效地扼杀了来自并发的所有可能的利润,但这是因为只有一个Resource实例。在实际应用程序中,当有数千个资源,并且只有几个冲突的情况时,这将很好地工作。
尽管如此,我仍然想知道如何改进与并发相关的代码!
发布于 2018-09-25 12:36:58
让我们首先检查ConsumeResource的内容,以了解为什么需要某种形式的locking (现在由resource.CapacityAccessProvider.Access提供)。ConsumeResource会这样做:
capture = resource.Capacity的可用资源总数Thread.Sleep),假设没有人改变可用的资源。capture相同,如果没有抛出。如果没有使用locking,问题将是两个或多个使用者(线程)可以同时到达通过所有检查的resource.SetCapacity(resource.GetCapacity(_id) - 1, _id);。假设有10个资源(例如,钢铁)和2个工人(生产钉子),两个线程都可以到达这条线,将总资源更改为10-1=9,但它应该是8!SetCapacity(resource.GetCapacity(_id) - 1)由三个操作组成: get、substract和set。所有线程都将get 10,substract 1,并到达有9在手的set。
resource.CapacityAccessProvider.Access通过锁定整个进程避免了这种争用条件,并且您正确地识别了它不会并发运行(现在,当代码被Access__包装时,它原来并不在那里)。问题是:工作本身是否需要访问共享状态?或者只是:
区别在于,只需要锁定第一部分:
public void ConsumeResource(Resource resource)
{
// acquire - only one thread can do this, other threads must wait
lock(resource) // or resource.CapacityAccessProvider.Access(() =>
{
var capture = resource.GetCapacity(_id);
if (capture < 1)
throw new InvalidOperationException("Not enough resource");
resource.SetCapacity(capture - 1, _id);
}
// consume (work) - any number of threads can do this
Thread.Sleep(_sleep);
// maybe output:
// lock(output) output.Add(nail);
}另一种选择是使用原子性操作,而收购可能会成为Resource的一部分(这是某种程度上的资源管理器,而ResourceManager则是流程管理器)。
https://codereview.stackexchange.com/questions/204220
复制相似问题