我有一个在多个线程上并发处理的命令列表。该列表是静态的,每个线程生成其输出,并且不干扰其他线程,因此到目前为止一切正常。
有些命令需要在命令附带的数据集上进行复杂的计算。所有线程的计算结果都是相同的。此时,每个线程在到达命令时执行计算,但这是浪费时间和资源。
我想做的是只执行一次计算,并在线程之间共享结果。首先到达命令的线程启动计算,到达命令的其他线程等待计算完成,然后使用结果。
我对线程同步几乎没有经验,我不知道在这个场景中应该使用哪些同步原语,以及应该将锁放在什么位置进行计算。您能告诉我在这个场景中应该使用哪些类(用于同步),线程应该在哪里等待,以及在哪个对象上?
我的代码如下所示:
private void ThreadFunc(object state)
{
Context ctx = (Context)state;
Command cmd = ctx.CommandList;
Processor proc = ctx.Processor;
while (cmd != null)
{
switch(cmd.Type)
{
case CommandType.Simple:
proc.ExecuteSimpleCommand(cmd);
break;
case CommandType.Complex:
cmd.Data = ComputeData(cmd.Dataset);
proc.ExecuteComplexCommand(cmd);
break;
}
cmd = cmd.Next;
}
}ComputeData方法执行复杂的计算,结果存储在命令中。
此时,代码有一个问题,因为每个线程都在同一个Command对象上设置数据属性,但是由于计算结果对于所有线程都是相同的,所以代码可以工作。
我在想这样的事情,但我不确定它是否正确:
case CommandType.Complex:
lock (cmd)
{
if (cmd.Data == null)
{
cmd.Data = ComputeData(cmd.Dataset);
}
}
proc.ExecuteComplexCommand(cmd);
break;编辑:我现在对.NET 2.0有限制。
编辑2:列表是固定的,它的元素不会改变。线程只读取列表,不修改列表。
我将尝试一个示例:列表包含元素A、B和C,它由线程T1和T2处理。当T1到达B时,它调用ComputeData方法并将结果存储在B的属性中。当T2到达B时,它等待计算结束(假设T1之前到达B,并且已经调用了ComputeData),并使用该结果。这就是我想要达到的目标。
发布于 2016-05-09 12:07:44
基本上,您正在尝试抓取链接列表的头,然后移动到下一个位置。显然,‘抓取’是这里的问题;很容易将其循环起来。
带锁的
简单的解决方案是使用锁:
private static object lockObject = new object();
// ...
Command current;
lock (lockObject)
{
current = CommandList;
CommandList = CommandList.Next;
}
// use current.或者,您可以使用自旋锁。
无锁
虽然没有锁的线程安全代码总是很棘手,但我的尝试如下(警告:这里可能有bug;我还没有彻底检查代码!):
Thread.MemoryBarrier(); // read barrier
var list = CommandList;
if (list != null)
{
var next = list.Next;
if (Interlocked.CompareExchange(ref CommandList, next, list) == list)
{
// execute code on 'list'.
}
else
{
// something changed. Try again.
}
},让我们改变这个问题.
有时对话会让事情更加混乱..。
首先到达命令的线程启动计算,到达命令的其他线程等待计算完成,然后使用结果。
和
简单的命令可以在没有问题的情况下并行运行。复杂的命令也可以并行运行(这就是我现在所做的),但是在每个线程上执行相同的计算,这是浪费时间。
让我们直截了当地说:假设我们有链A->B->C和A和C简单的命令,B是一个复杂的命令。我们希望A,B并行运行,在B完成后运行C。在执行B时,我们都应该等待它完成。
我们想到了一个简单的解决方案:让我们假设简单命令没有数据,复杂命令也有数据。您还说过,列表是在调用它之前创建的。这意味着我们不需要做太多的同步。
基本上,您可以这样做:
var current = this.CommandList;
while (current != null)
{
if (current.Data != null) // is it a complex command? (B)
{
lock (current.Data) // all threads wait here except one
{
if (current.Executed) // execute it once.
{
// Go ahead and execute it, single threaded
// [code]
current.Executed = true;
}
}
}
else
{
bool executeHere = false;
// simple command.
lock (lockObject) // shared lock object
{
executeHere = !command.Executed; // execute it in this thread?
command.Executed = true;
}
// will be true in 1 thread only, but multiple A's/C's can be executed in parallel.
if (executeHere)
{
// execute simple command
// [code]
}
}
current = current.Next;
}发布于 2016-05-09 22:03:01
试试这个:
namespace ConsoleApplication6 {
public class Context
{
public Command CommandList { get; set; }
public Processor Processor { get; set; }
}
public class Processor
{
public void ExecuteSimpleCommand(Command command)
{
}
public void ExecuteComplexCommand(Command command) {
}
}
public enum CommandType
{
Simple,
Complex
}
public class Command {
public CommandType Type { get; set; }
public Command Next { get; set; }
public object Data { get; set; }
}
class Program
{
private readonly object signalObject = new object();
private object computationResult;
private int sharedPriority = -1;
static void Main(string[] args) {
}
private void ThreadFunc(object state) {
Context ctx = (Context)state;
Command cmd = ctx.CommandList;
Processor proc = ctx.Processor;
while (cmd != null) {
switch (cmd.Type) {
case CommandType.Simple:
proc.ExecuteSimpleCommand(cmd);
break;
case CommandType.Complex:
//each thread will atomically increment the sharedPriority; only the thread with priority=0 (we start from -1) will do the heavy computation
cmd.Data = ComputeData(null, Interlocked.Increment(ref sharedPriority)); //pass around your input
proc.ExecuteComplexCommand(cmd);
break;
}
cmd = cmd.Next;
}
}
private object ComputeData(object input, int priority)
{
// we allow only the thread with the priority 0 to perform the heavy computation
if (priority != 0)
{
// for all other threads, it will wait for the thread that got the chance to do the heavy computation to signal
Monitor.Wait(signalObject);
return computationResult;
}
//heavy computation here
computationResult = new object(); // .. implement your heavy logic here
Monitor.PulseAll(signalObject); // we've computed the result, let all other threads to do use the heavy computation result
return computationResult;
}
}
}您必须确保computationResult不会被线程更改(只读取),否则您将得到一些恶劣的竞争条件。
发布于 2016-05-09 12:26:48
我不太明白这个问题。但是,常见的情况是,当您有一个工作项列表并希望在多个线程中处理它们时。在C#中,您可以使用一些线程安全集合。如ConcurrentDictionary,ConcurrentBag,ConcurrentQueue,ConcurrentStack等。详情请参阅MSDN。
所有这些类都实现了内部锁定。你只需要选择一个更适合你的情况。下面是一个ConcurrentQueue示例:
class Program
{
static long _total;
static ConcurrentQueue<int> _queued;
static void Main(string[] args)
{
IEnumerable<int> numbers = Enumerable.Range(1, 1000000);
_queued = new ConcurrentQueue<int>(numbers);
_total = 0;
Task task1 = Task.Run(() => ProcessQueue());
Task task2 = Task.Run(() => ProcessQueue());
Task.WaitAll(
Task.Run(() => ProcessQueue()),
Task.Run(() => ProcessQueue()));
Console.WriteLine("Total: {0}", _total);
}
static void ProcessQueue()
{
int value;
while (_queued.TryDequeue(out value))
{
Interlocked.Add(ref _total, value);
}
}
}https://stackoverflow.com/questions/37112188
复制相似问题