我有一些填充EventfulConcurrentQueue的代码,即ConcurrentQueue。因此,我需要将所有数据从EventfulConcurrentQueue保存到数据库中。
我认为应该使用某种多线程模型来排列项目。因此,EventfulConcurrentQueue应该从不同的线程中删除,这些线程获取EventfulConcurrentQueue的部分项,并将条目插入到数据库中。
到目前为止,我有以下代码。
EventfulConcurrentQueue的使用
public partial class FormMain: Form {
private EventfulConcurrentQueue < PacketItem > PacketQueue {
get;
set;
} = new EventfulConcurrentQueue < PacketItem > ();
private void FormMain_Load(object sender, EventArgs e) {
StartPersistingTask();
}
private async Task StartPersistingTask() {
var chunkOfPacketsToInsert = 100;
try {
while (true) {
if (PacketQueue.Count > chunkOfPacketsToInsert) {
int counter = 0;
var chunkToBulkInsert = DataHelper.CreateBulkCopyDataTable();
while (counter <= chunkOfPacketsToInsert) {
PacketItem packetItem = null;
PacketQueue.TryDequeue(out packetItem);
if (packetItem != null) {
chunkToBulkInsert.Rows.Add(Guid.NewGuid(), packetItem.AtTime, packetItem.GPSTrackerID, packetItem.PacketBytes);
counter++;
} else {
break;
}
};
//create object of SqlBulkCopy which help to insert
using
var bulkCopy = new SqlBulkCopy(ConfigurationManager.ConnectionStrings["MyDB"].ConnectionString, SqlBulkCopyOptions.Default);
bulkCopy.DestinationTableName = "MyTable";
try {
bulkCopy.WriteToServer(chunkToBulkInsert);
} catch (Exception ex) {
Log(ex, LogErrorEnums.Fatal);
} finally {
Log(LogErrorEnums.Info, $ "Items: {counter}");
}
} else {
await Task.Delay(100);
}
}
} catch (Exception ex) {
Log(ex, LogErrorEnums.Error);
}
}
}EventfulConcurrentQueue的定义
public sealed class EventfulConcurrentQueue < T > {
private readonly ConcurrentQueue < T > _queue;
public EventfulConcurrentQueue() {
_queue = new ConcurrentQueue < T > ();
}
public int Count {
get {
return _queue.Count;
}
}
public bool IsEmpty {
get {
return _queue.IsEmpty;
}
}
public void Enqueue(T item) {
_queue.Enqueue(item);
OnItemEnqueued();
}
public bool TryDequeue(out T result) {
var success = _queue.TryDequeue(out result);
if (success) {
OnItemDequeued(result);
}
return success;
}
public event EventHandler ItemEnqueued;
public event EventHandler < ItemDequeuedEventArgs < T >> ItemDequeued;
void OnItemEnqueued() {
ItemEnqueued?.Invoke(this, EventArgs.Empty);
}
void OnItemDequeued(T item) {
ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs < T > {
Item = item
});
}
}
public sealed class ItemDequeuedEventArgs < T >: EventArgs {
public T Item {
get;
set;
}
}
public sealed class PacketItem {
public int GPSTrackerID {
get;
set;
}
public DateTime AtTime {
get;
set;
}
public byte[] PacketBytes {
get;
set;
}
public int GPSTrackerTypeID {
get;
set;
}
/// <summary>
/// Parsed by device's protocol packet
/// </summary>
public object ParsedPacket {
get;
set;
}
}发布于 2021-05-04 09:54:41
我认为EventfulConcurrentQueue不是为这个用例设计的。
从消费者的角度看:
IsEmptyEnqueueItemEnqueuedItemDequeuedDequeueAtMostNItemsWaitUntilThereAreEnoughItems在删除了未使用的内容并添加了缺失的内容后,类如下所示:
public sealed class EventfulConcurrentQueue<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
public List<T> DequeueAtMostNItems(int n)
{
List<T> result = new();
T current;
do
{
if(_queue.TryDequeue(out current))
result.Add(current);
} while (current != null && result.Count <= n);
return result;
}
public async Task WaitUntilThereAreEnoughItems(int numberOfItems)
{
while (_queue.Count < numberOfItems)
{
await Task.Delay(100);
}
return;
}
}有了这些方法,您就可以接受封装。操作和数据是相邻的。使用这种方法,您不必公开内部的任何内容,因此信息隐藏方面也包括在内。
如果您坚持这个实现,那么我建议重命名,因为ConcurrentQueue现在变成了实现细节。
用法可以简化如下:
while (true)
{
await PacketQueue.WaitUntilThereAreEnoughItems(chunkSize);
var chunk = PacketQueue.DequeueAtMostNItems(chunkSize);
DataTable toBeInserted = DataHelper.CreateBulkCopyDataTable();
chunk.ForEach(row => toBeInserted.Rows.Add(Guid.NewGuid(), row.AtTime, row.GPSTrackerID, row.PacketBytes));
using var bulkCopy = ...
}https://codereview.stackexchange.com/questions/260326
复制相似问题