首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用数据流处理流数据

用数据流处理流数据
EN

Code Review用户
提问于 2019-05-14 05:08:50
回答 1查看 845关注 0票数 5

我与第三方API有一个websocket连接.API返回大量数据,在峰值时间内,它每秒返回数百条消息。我需要处理数据有两个目的:将所有数据保存在DB中并将一些数据发送到RabbitMq。

其想法如下:当批处理大小为1000或超时时,我希望将数据保存到DB,超时等于3秒。

我希望在1秒超时前将数据发布到RabbitMQ。这是一种节流,因为有很多数据。此外,我需要为特定的代码选择最后一条记录,我已经在ActionBlock中完成了,f.e:我们在批处理中有以下记录:

{"Ticker":"MSFT","DateTime":'2019-05-14T10:00:00:100'} {"Ticker":"MSFT","DateTime":‘2019-05-14T10:00:150’} {"Ticker":"AAPL","DateTime":'2019-05-14T10:00:00:300'}

我只需要为特定的代码发布最后一条信息,所以在过滤之后,我将发布2条记录:

{"Ticker":"MSFT","DateTime":'2019-05-14T10:00:00:150'} {"Ticker":"AAPL","DateTime":'2019-05-14T10:00:00:300'}

完整代码:

代码语言:javascript
复制
public class StreamMessagePipeline < T > where T: StreamingMessage {
    private readonly BatchBlock < T > _saveBatchBlock;
    private readonly BatchBlock < T > _publishBatchBlock;

    public StreamMessagePipeline() {
        _saveBatchBlock = new BatchBlock < T > (1000);
        _publishBatchBlock = new BatchBlock < T > (500);

        SetupSaveBatchPipeline();
        SetupPublishBatchPipeline();
    }

    private void SetupSaveBatchPipeline() {
        var saveBatchTimeOut = TimeSpan.FromSeconds(3);
        var saveBatchTimer = new Timer(saveBatchTimeOut.TotalMilliseconds);

        saveBatchTimer.Elapsed += (s, e) = >_saveBatchBlock.TriggerBatch();

        var actionBlockSave = new ActionBlock < IEnumerable < T >> (x = >{
            //Reset the timeout since we got a batch
            saveBatchTimer.Stop();
            saveBatchTimer.Start();

            Console.WriteLine($ "Save to DB : {x.Count()}");
        });

        _saveBatchBlock.LinkTo(actionBlockSave, new DataflowLinkOptions {
            PropagateCompletion = true
        });
    }

    private void SetupPublishBatchPipeline() {
        var publishBatchTimeOut = TimeSpan.FromSeconds(1);
        var publishBatchTimer = new Timer(publishBatchTimeOut.TotalMilliseconds);

        publishBatchTimer.Elapsed += (s, e) = >_publishBatchBlock.TriggerBatch();

        var actionBlockPublic = new ActionBlock < IEnumerable < T >> (x = >{
            var res = x.GroupBy(d => d.Ticker).Select(d = >d.OrderByDescending(s =>s.DateTime).FirstOrDefault()).ToList();

            Console.WriteLine($ "Publish data to somewhere : {res.Count()}");
            //Reset the timeout since we got a batch
            publishBatchTimer.Stop();
            publishBatchTimer.Start();

        });

        _publishBatchBlock.LinkTo(actionBlockPublic, new DataflowLinkOptions {
            PropagateCompletion = true
        });
    }

    public async Task Handle(T record) {
        await _saveBatchBlock.SendAsync(record);
        await _publishBatchBlock.SendAsync(record);
    }

}
EN

回答 1

Code Review用户

回答已采纳

发布于 2019-05-14 15:35:00

General注释:我会将超时和批处理大小移到私有字段中。我不知道您使用的是什么版本的c#,但是您可以创建一个类,或者如果您有良好的元组支持,您可以这样创建它

代码语言:javascript
复制
private readonly (TimeSpan timeout, int buffer) _saveBatchSettings = (TimeSpan.FromSeconds(3), 1000);
private readonly (TimeSpan timeout, int buffer) _publishBatchSettings = (TimeSpan.FromSeconds(1), 500);

在主程序之外拥有这些是很好的,因为通常在批处理时,您必须调整超时和大小以找到最合适的位置,这样就可以更简单地找到需要更改的内容。

此外,您也无法完成源。我建议将类设置为IDisposable,并在dispose中将源标记为完整。

TPL DataFlow注释:为什么不使用绑定到BatchBlocks的BroadcastBlock,那么它是一个发送,而不是两个。

把这个移开

代码语言:javascript
复制
var res = x.GroupBy(d => d.Ticker).Select(d = >d.OrderByDescending(s =>s.DateTime).FirstOrDefault()).ToList();

到TransformBlock中,并使ActionBlock只包含发布结果的代码。

对于ActionBlock,这只是我的首选,我创建了一个操作块调用的方法,而不是内联lambda。该代码通常是处理代码,如果它在自己的方法中,并且在管道设置之外,维护和读取就更容易了。

ReactiveExtensions :如果您愿意将ReactiveExtensions添加到您的项目中,您可以混合Rx和TPL DataFlow块。Rx有一个用于缓冲的内置方法,称为缓冲区。DataFlow块有一个AsObservableAsObserver,可以切换到Rx和TPL。这将是将Rx与TPL结合使用的示例。

代码语言:javascript
复制
public class StreamMessagePipeline : IDisposable
    where T : StreamingMessage
{
    private BroadcastBlock _source;

    private readonly (TimeSpan timeout, int buffer) _saveBatchSettings = (TimeSpan.FromSeconds(3), 1000);
    private readonly (TimeSpan timeout, int buffer) _publishBatchSettings = (TimeSpan.FromSeconds(1), 500);

    public StreamMessagePipeline()
    {
        _source = new BroadcastBlock(x => x);
        SetupSaveBatchPipeline();
        SetupPublishBatchPipeline();
    }

    public async Task Handle(T record)
    {
        await _source.SendAsync(record);
    }

    private void SetupSaveBatchPipeline()
    {
        var actionBlockSave = new ActionBlock>(SaveBatch);

        // Instead of action block you could stay in Rx and just use Subscribe 
        _source.AsObservable()
            .Buffer(_saveBatchSettings.timeout, _saveBatchSettings.buffer)
            .Where(x => x.Count > 0) // unlike TriggerBatch Buffer will send out an empty list
            .Subscribe(actionBlockSave.AsObserver());
    }

    private void SetupPublishBatchPipeline()
    {

        var transformBlock = new TransformBlock, IList>(x =>
        {
            return x.GroupBy(d => d.Ticker)
                .Select(d => d.OrderByDescending(s => s.DateTime).FirstOrDefault()).ToList();
        });

        var actionBlockPublish = new ActionBlock>(PublishBatch);

        transformBlock.LinkTo(actionBlockPublish, new DataflowLinkOptions()
        {
             PropagateCompletion = true,
        });

        _source.AsObservable()
            .Buffer(_publishBatchSettings.timeout, _publishBatchSettings.buffer)
            .Where(x => x.Count > 0) // unlike TriggerBatch Buffer will send out an empty list
            .Subscribe(transformBlock.AsObserver());
    }

    private void SaveBatch(IList messages)
    {
        // Save the batch
    }

    private void PublishBatch(IList messages)
    {
        // publish the batch 
    }

    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                _source.Complete();
            }
            disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
    #endregion

}

更新如果您不想使用Rx,您仍然可以定制一个具有相同功能的块。

代码语言:javascript
复制
public static class TimerBatchBlock
{
    public static IPropagatorBlock Create(int batchSize, TimeSpan timeSpan, GroupingDataflowBlockOptions options = null)
    {
        var batchBlock = new BatchBlock(batchSize, options ?? new GroupingDataflowBlockOptions());
        var broadCastBlock = new BroadcastBlock(x => x);
        var bufferBlock = new BufferBlock();

        // timer setup (System.Threading.Timer)
        var timer = new Timer(x => ((BatchBlock)x).TriggerBatch(), batchBlock, timeSpan, timeSpan);
        var resetTimerBlock = new ActionBlock(_ => timer.Change(timeSpan, timeSpan)); // reset timer each time buffer outputs
        resetTimerBlock.Completion.ContinueWith(_ => timer.Dispose());

        // link everything up
        var linkOptions = new DataflowLinkOptions()
        {
            PropagateCompletion = true,
        };
        broadCastBlock.LinkTo(resetTimerBlock, linkOptions);
        broadCastBlock.LinkTo(bufferBlock, linkOptions);
        batchBlock.LinkTo(broadCastBlock, linkOptions);

        return DataflowBlock.Encapsulate(batchBlock, bufferBlock);
    }
}

您可以链接BroadCastBlock -> TimerBatchBlock -> ActionBlock。

票数 5
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/220215

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档