首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >TThreadedQueue不能多个消费者吗?

TThreadedQueue不能多个消费者吗?
EN

Stack Overflow用户
提问于 2011-01-31 21:25:50
回答 5查看 7.1K关注 0票数 43

尝试在单个生产者多用户方案中使用TThreadedQueue (Generics.Collections)。(德尔菲-XE)。这样做的目的是将对象推到队列中,并让多个工作线程耗尽队列。

不过,它并不像预期的那样起作用。当两个或多个工作线程正在调用PopItem时,将从TThreadedQueue引发访问冲突。

如果使用关键部分序列化对PopItem的调用,则一切正常。

当然,TThreadedQueue应该能够处理多个消费者,所以我是不是遗漏了什么,或者这是TThreadedQueue中的一个纯bug?

下面是一个生成错误的简单示例。

代码语言:javascript
复制
program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

Update:在DelphiXE2中修复了导致TThreadedQueue崩溃的TMonitor错误。

更新2:上面的测试使队列处于空状态。达里安·米勒( Darian )发现,在完全状态下强调队列,仍然可以在XE2中重现错误。错误再次出现在TMonitor中。有关更多信息,请参见下面他的回答。也是QC101114的链接。

更新3:使用Delphi-XE2更新4,已经发布了一个修复TMonitor的方法,可以解决TThreadedQueue中的问题。到目前为止,我的测试还不能在TThreadedQueue中重现任何错误。当队列为空且满时,测试单个生产者/多个使用者线程。还测试了多个生产者/多个消费者。我将读者线程和写入线程从1更改到100,没有任何故障。但是知道了历史,我敢让别人打破TMonitor

EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2011-01-31 23:45:11

如果没有大量的测试,很难确定这是一个错误,无论是在TThreadedQueue中还是在TMonitor中。无论哪种方式,它都在RTL中,而不是在您的代码中。您应该将此作为QC报告存档,并使用上面的示例作为“如何复制”代码。

票数 19
EN

Stack Overflow用户

发布于 2011-02-01 08:22:48

我建议您在使用线程、并行性等时使用OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary。Primoz做得很好,在站点上您会发现很多有用的文档。

票数 10
EN

Stack Overflow用户

发布于 2011-11-17 22:08:36

您的示例在XE2下似乎运行得很好,但是如果我们在PushItem上填充您的队列,它就会失败。(在XE2 Update1下测试)

若要复制,只需将任务创建从100增加到1100 (队列深度设置为1024)。

代码语言:javascript
复制
for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

每次在Windows 7上,我都会死掉。我一开始试着不断地对它进行压力测试,它在65循环的循环30...then (循环16...then )上失败了,所以在不同的时间间隔内,但是在某个时候它一直失败。

代码语言:javascript
复制
  iLoop := 0;
  while iLoop < 1000 do
  begin
    Inc(iLoop);
    WriteLn('Loop: ' + IntToStr(iLoop));  
    for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
  end;
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/4856306

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档