我与第三方API有一个websocket连接.API返回大量数据,在峰值时间内,它每秒返回数百条消息。我需要处理数据有两个目的:将所有数据保存在DB中并将一些数据发送到RabbitMq。
其想法如下:当批处理大小为1000或超时时,我希望将数据保存到DB,超时等于3秒。
我希望在1秒超时前将数据发布到RabbitMQ。这是一种节流,因为有很多数据。此外,我需要为特定的代码选择最后一条记录,我已经在ActionBlock中完成了,f.e:我们在批处理中有以下记录:
{"Ticker":"MSFT","DateTime":'2019-05-14T10:00:00:100'} {"Ticker":"MSFT","DateTime":‘2019-05-14T10:00:150’} {"Ticker":"AAPL","DateTime":'2019-05-14T10:00:00:300'}
我只需要为特定的代码发布最后一条信息,所以在过滤之后,我将发布2条记录:
{"Ticker":"MSFT","DateTime":'2019-05-14T10:00:00:150'} {"Ticker":"AAPL","DateTime":'2019-05-14T10:00:00:300'}
完整代码:
public class StreamMessagePipeline < T > where T: StreamingMessage {
private readonly BatchBlock < T > _saveBatchBlock;
private readonly BatchBlock < T > _publishBatchBlock;
public StreamMessagePipeline() {
_saveBatchBlock = new BatchBlock < T > (1000);
_publishBatchBlock = new BatchBlock < T > (500);
SetupSaveBatchPipeline();
SetupPublishBatchPipeline();
}
private void SetupSaveBatchPipeline() {
var saveBatchTimeOut = TimeSpan.FromSeconds(3);
var saveBatchTimer = new Timer(saveBatchTimeOut.TotalMilliseconds);
saveBatchTimer.Elapsed += (s, e) = >_saveBatchBlock.TriggerBatch();
var actionBlockSave = new ActionBlock < IEnumerable < T >> (x = >{
//Reset the timeout since we got a batch
saveBatchTimer.Stop();
saveBatchTimer.Start();
Console.WriteLine($ "Save to DB : {x.Count()}");
});
_saveBatchBlock.LinkTo(actionBlockSave, new DataflowLinkOptions {
PropagateCompletion = true
});
}
private void SetupPublishBatchPipeline() {
var publishBatchTimeOut = TimeSpan.FromSeconds(1);
var publishBatchTimer = new Timer(publishBatchTimeOut.TotalMilliseconds);
publishBatchTimer.Elapsed += (s, e) = >_publishBatchBlock.TriggerBatch();
var actionBlockPublic = new ActionBlock < IEnumerable < T >> (x = >{
var res = x.GroupBy(d => d.Ticker).Select(d = >d.OrderByDescending(s =>s.DateTime).FirstOrDefault()).ToList();
Console.WriteLine($ "Publish data to somewhere : {res.Count()}");
//Reset the timeout since we got a batch
publishBatchTimer.Stop();
publishBatchTimer.Start();
});
_publishBatchBlock.LinkTo(actionBlockPublic, new DataflowLinkOptions {
PropagateCompletion = true
});
}
public async Task Handle(T record) {
await _saveBatchBlock.SendAsync(record);
await _publishBatchBlock.SendAsync(record);
}
}发布于 2019-05-14 15:35:00
General注释:我会将超时和批处理大小移到私有字段中。我不知道您使用的是什么版本的c#,但是您可以创建一个类,或者如果您有良好的元组支持,您可以这样创建它
private readonly (TimeSpan timeout, int buffer) _saveBatchSettings = (TimeSpan.FromSeconds(3), 1000);
private readonly (TimeSpan timeout, int buffer) _publishBatchSettings = (TimeSpan.FromSeconds(1), 500);在主程序之外拥有这些是很好的,因为通常在批处理时,您必须调整超时和大小以找到最合适的位置,这样就可以更简单地找到需要更改的内容。
此外,您也无法完成源。我建议将类设置为IDisposable,并在dispose中将源标记为完整。
TPL DataFlow注释:为什么不使用绑定到BatchBlocks的BroadcastBlock,那么它是一个发送,而不是两个。
把这个移开
var res = x.GroupBy(d => d.Ticker).Select(d = >d.OrderByDescending(s =>s.DateTime).FirstOrDefault()).ToList();到TransformBlock中,并使ActionBlock只包含发布结果的代码。
对于ActionBlock,这只是我的首选,我创建了一个操作块调用的方法,而不是内联lambda。该代码通常是处理代码,如果它在自己的方法中,并且在管道设置之外,维护和读取就更容易了。
ReactiveExtensions :如果您愿意将ReactiveExtensions添加到您的项目中,您可以混合Rx和TPL DataFlow块。Rx有一个用于缓冲的内置方法,称为缓冲区。DataFlow块有一个AsObservable和AsObserver,可以切换到Rx和TPL。这将是将Rx与TPL结合使用的示例。
public class StreamMessagePipeline : IDisposable
where T : StreamingMessage
{
private BroadcastBlock _source;
private readonly (TimeSpan timeout, int buffer) _saveBatchSettings = (TimeSpan.FromSeconds(3), 1000);
private readonly (TimeSpan timeout, int buffer) _publishBatchSettings = (TimeSpan.FromSeconds(1), 500);
public StreamMessagePipeline()
{
_source = new BroadcastBlock(x => x);
SetupSaveBatchPipeline();
SetupPublishBatchPipeline();
}
public async Task Handle(T record)
{
await _source.SendAsync(record);
}
private void SetupSaveBatchPipeline()
{
var actionBlockSave = new ActionBlock>(SaveBatch);
// Instead of action block you could stay in Rx and just use Subscribe
_source.AsObservable()
.Buffer(_saveBatchSettings.timeout, _saveBatchSettings.buffer)
.Where(x => x.Count > 0) // unlike TriggerBatch Buffer will send out an empty list
.Subscribe(actionBlockSave.AsObserver());
}
private void SetupPublishBatchPipeline()
{
var transformBlock = new TransformBlock, IList>(x =>
{
return x.GroupBy(d => d.Ticker)
.Select(d => d.OrderByDescending(s => s.DateTime).FirstOrDefault()).ToList();
});
var actionBlockPublish = new ActionBlock>(PublishBatch);
transformBlock.LinkTo(actionBlockPublish, new DataflowLinkOptions()
{
PropagateCompletion = true,
});
_source.AsObservable()
.Buffer(_publishBatchSettings.timeout, _publishBatchSettings.buffer)
.Where(x => x.Count > 0) // unlike TriggerBatch Buffer will send out an empty list
.Subscribe(transformBlock.AsObserver());
}
private void SaveBatch(IList messages)
{
// Save the batch
}
private void PublishBatch(IList messages)
{
// publish the batch
}
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_source.Complete();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}更新如果您不想使用Rx,您仍然可以定制一个具有相同功能的块。
public static class TimerBatchBlock
{
public static IPropagatorBlock Create(int batchSize, TimeSpan timeSpan, GroupingDataflowBlockOptions options = null)
{
var batchBlock = new BatchBlock(batchSize, options ?? new GroupingDataflowBlockOptions());
var broadCastBlock = new BroadcastBlock(x => x);
var bufferBlock = new BufferBlock();
// timer setup (System.Threading.Timer)
var timer = new Timer(x => ((BatchBlock)x).TriggerBatch(), batchBlock, timeSpan, timeSpan);
var resetTimerBlock = new ActionBlock(_ => timer.Change(timeSpan, timeSpan)); // reset timer each time buffer outputs
resetTimerBlock.Completion.ContinueWith(_ => timer.Dispose());
// link everything up
var linkOptions = new DataflowLinkOptions()
{
PropagateCompletion = true,
};
broadCastBlock.LinkTo(resetTimerBlock, linkOptions);
broadCastBlock.LinkTo(bufferBlock, linkOptions);
batchBlock.LinkTo(broadCastBlock, linkOptions);
return DataflowBlock.Encapsulate(batchBlock, bufferBlock);
}
}您可以链接BroadCastBlock -> TimerBatchBlock -> ActionBlock。
https://codereview.stackexchange.com/questions/220215
复制相似问题