首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在ConcurrentQueue中清除C#的多线程模型

在ConcurrentQueue中清除C#的多线程模型
EN

Code Review用户
提问于 2021-05-04 07:38:27
回答 1查看 565关注 0票数 2

我有一些填充EventfulConcurrentQueue的代码,即ConcurrentQueue。因此,我需要将所有数据从EventfulConcurrentQueue保存到数据库中。

我认为应该使用某种多线程模型来排列项目。因此,EventfulConcurrentQueue应该从不同的线程中删除,这些线程获取EventfulConcurrentQueue的部分项,并将条目插入到数据库中。

到目前为止,我有以下代码。

EventfulConcurrentQueue

的使用

代码语言:javascript
复制
public partial class FormMain: Form {

  private EventfulConcurrentQueue < PacketItem > PacketQueue {
    get;
    set;
  } = new EventfulConcurrentQueue < PacketItem > ();

  private void FormMain_Load(object sender, EventArgs e) {
    StartPersistingTask();
  }

  private async Task StartPersistingTask() {
    var chunkOfPacketsToInsert = 100;

    try {
      while (true) {
        if (PacketQueue.Count > chunkOfPacketsToInsert) {
          int counter = 0;
          var chunkToBulkInsert = DataHelper.CreateBulkCopyDataTable();

          while (counter <= chunkOfPacketsToInsert) {
            PacketItem packetItem = null;
            PacketQueue.TryDequeue(out packetItem);
            if (packetItem != null) {
              chunkToBulkInsert.Rows.Add(Guid.NewGuid(), packetItem.AtTime, packetItem.GPSTrackerID, packetItem.PacketBytes);
              counter++;
            } else {
              break;
            }
          };

          //create object of SqlBulkCopy which help to insert  
          using
          var bulkCopy = new SqlBulkCopy(ConfigurationManager.ConnectionStrings["MyDB"].ConnectionString, SqlBulkCopyOptions.Default);
          bulkCopy.DestinationTableName = "MyTable";
          try {
            bulkCopy.WriteToServer(chunkToBulkInsert);
          } catch (Exception ex) {
            Log(ex, LogErrorEnums.Fatal);
          } finally {
            Log(LogErrorEnums.Info, $ "Items: {counter}");
          }
        } else {
          await Task.Delay(100);
        }
      }
    } catch (Exception ex) {
      Log(ex, LogErrorEnums.Error);
    }
  }

}

EventfulConcurrentQueue

的定义

代码语言:javascript
复制
public sealed class EventfulConcurrentQueue < T > {
  private readonly ConcurrentQueue < T > _queue;

  public EventfulConcurrentQueue() {
    _queue = new ConcurrentQueue < T > ();
  }

  public int Count {
    get {
      return _queue.Count;
    }
  }

  public bool IsEmpty {
    get {
      return _queue.IsEmpty;
    }
  }

  public void Enqueue(T item) {
    _queue.Enqueue(item);
    OnItemEnqueued();
  }

  public bool TryDequeue(out T result) {
    var success = _queue.TryDequeue(out result);

    if (success) {
      OnItemDequeued(result);
    }
    return success;
  }

  public event EventHandler ItemEnqueued;
  public event EventHandler < ItemDequeuedEventArgs < T >> ItemDequeued;

  void OnItemEnqueued() {
    ItemEnqueued?.Invoke(this, EventArgs.Empty);
  }

  void OnItemDequeued(T item) {
    ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs < T > {
      Item = item
    });
  }
}

public sealed class ItemDequeuedEventArgs < T >: EventArgs {
  public T Item {
    get;
    set;
  }
}

public sealed class PacketItem {
  public int GPSTrackerID {
    get;
    set;
  }
  public DateTime AtTime {
    get;
    set;
  }

  public byte[] PacketBytes {
    get;
    set;
  }

  public int GPSTrackerTypeID {
    get;
    set;
  }

  /// <summary>
  /// Parsed by device's protocol packet
  /// </summary>
  public object ParsedPacket {
    get;
    set;
  }
}
EN

回答 1

Code Review用户

回答已采纳

发布于 2021-05-04 09:54:41

我认为EventfulConcurrentQueue不是为这个用例设计的。

从消费者的角度看:

  • 不使用的东西--
    • IsEmpty
    • Enqueue
    • ItemEnqueued
    • ItemDequeued

  • 缺少的东西
    • DequeueAtMostNItems
    • WaitUntilThereAreEnoughItems

在删除了未使用的内容并添加了缺失的内容后,类如下所示:

代码语言:javascript
复制
public sealed class EventfulConcurrentQueue<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();

    public List<T> DequeueAtMostNItems(int n)
    {
        List<T> result = new();
        T current;
        do
        {
            if(_queue.TryDequeue(out current))
                result.Add(current);
        } while (current != null && result.Count <= n);

        return result;
    }

    public async Task WaitUntilThereAreEnoughItems(int numberOfItems)
    {
        while (_queue.Count < numberOfItems)
        {
            await Task.Delay(100);
        }
        return;
    }
}

有了这些方法,您就可以接受封装。操作和数据是相邻的。使用这种方法,您不必公开内部的任何内容,因此信息隐藏方面也包括在内。

如果您坚持这个实现,那么我建议重命名,因为ConcurrentQueue现在变成了实现细节。

用法可以简化如下:

代码语言:javascript
复制
while (true)
{
    await PacketQueue.WaitUntilThereAreEnoughItems(chunkSize);
                    
    var chunk = PacketQueue.DequeueAtMostNItems(chunkSize);
    DataTable toBeInserted = DataHelper.CreateBulkCopyDataTable();
    chunk.ForEach(row => toBeInserted.Rows.Add(Guid.NewGuid(), row.AtTime, row.GPSTrackerID, row.PacketBytes));
 
    using var bulkCopy = ...
}
票数 3
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/260326

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档