首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >处理System.Reactive的订阅中的异常

处理System.Reactive的订阅中的异常
EN

Stack Overflow用户
提问于 2021-12-03 07:42:56
回答 1查看 35关注 0票数 0

我想使用System.Reactive显示以下所有订阅的例外。

下面的一小段代码,您可以看到我尝试了什么,但它仍然没有显示消息。

代码语言:javascript
复制
client.Streams.AggregateTradesStream
    .Subscribe(response =>
    {
        throw new Exception("Asd");

        Guard.Against.Null(response, nameof(response));
        Guard.Against.Null(response.Data, nameof(response.Data),
            "Something went wrong and the aggregated trade object is null");

        var trade = response.Data;
        Log.Information($"Aggregated trade [{trade.Symbol}] [{trade.Side}] " +
                        $"Price: {trade.Price} Size: {trade.Quantity}");
    }, ex => Console.WriteLine("Exception: {0} {1}", ex.Message, DateTime.Now))
    .DisposeWith(disposable);

我该如何处理/显示所有.Subscribe的异常,因为我所做的不起作用?也许,扩展方法可能会有所帮助。

完整代码

代码语言:javascript
复制
public class Program
{
    private static readonly ManualResetEvent ExitEvent = new(false);

    private static async Task Main()
    {
        Log.Logger = new LoggerConfiguration()
            .MinimumLevel.Verbose()
            .Enrich.FromLogContext()
            .WriteTo.Console(LogEventLevel.Debug, theme: SystemConsoleTheme.Colored)
            .WriteTo.File(Path.Combine("logs", "verbose.log"), rollingInterval: RollingInterval.Day)
            .CreateLogger();

        var disposable = new CompositeDisposable();
        var uri = new Uri("wss://stream.binance.com:9443");

        using var communicator = new BinanceWebSocketCommunicator(uri);

        communicator.Name = "Binance-Spot";
        communicator.ReconnectTimeout = TimeSpan.FromMinutes(10);

        communicator.ReconnectionHappened
            .Subscribe(info => Log.Information($"Reconnection happened, type: {info.Type}"))
            .DisposeWith(disposable);

        communicator.DisconnectionHappened
            .Subscribe(info => Log.Information($"Disconnection happened, type: {info.Type}"))
            .DisposeWith(disposable);

        using var client = new BinanceWebSocketClient(communicator);

        client.Streams.PongStream
            .Subscribe(x => Log.Information($"Pong received ({x.Message})"))
            .DisposeWith(disposable);

        client.Streams.AggregateTradesStream
            .Subscribe(response =>
            {
                throw new Exception("Asd");

                Guard.Against.Null(response, nameof(response));
                Guard.Against.Null(response.Data, nameof(response.Data),
                    "Something went wrong and the aggregated trade object is null");

                var trade = response.Data;
                Log.Information($"Aggregated trade [{trade.Symbol}] [{trade.Side}] " +
                                $"Price: {trade.Price} Size: {trade.Quantity}");
            }, ex => Console.WriteLine("Exception: {0} {1}", ex.Message, DateTime.Now))
            .DisposeWith(disposable);

        client.Streams.KlineStream
            .Subscribe(response =>
            {
                Guard.Against.Null(response, nameof(response));
                Guard.Against.Null(response.Data, nameof(response.Data),
                    "Something went wrong and the kline object is null");
                Guard.Against.Null(response.Data.Data, nameof(response.Data.Data),
                    "Something went wrong and the kline data object is null");

                var kline = response.Data;
                var klineData = response.Data.Data;

                Log.Information($"Kline [{kline.Symbol}] " +
                                $"Kline start time: {klineData.StartTime} " +
                                $"Kline close time: {klineData.CloseTime} " +
                                $"Interval: {klineData.Interval} " +
                                $"First trade ID: {klineData.FirstTradeId} " +
                                $"Last trade ID: {klineData.LastTradeId} " +
                                $"Open price: {klineData.OpenPrice} " +
                                $"Close price: {klineData.ClosePrice} " +
                                $"High price: {klineData.HighPrice} " +
                                $"Low price: {klineData.LowPrice} " +
                                $"Base asset volume: {klineData.BaseAssetVolume} " +
                                $"Number of trades: {klineData.NumberTrades} " +
                                $"Is this kline closed?: {klineData.IsClosed} " +
                                $"Quote asset volume: {klineData.QuoteAssetVolume} " +
                                $"Taker buy base: {klineData.TakerBuyBaseAssetVolume} " +
                                $"Taker buy quote: {klineData.TakerBuyQuoteAssetVolume} " +
                                $"Ignore: {klineData.Ignore} ");
            })
            .DisposeWith(disposable);

        client.AddSubscription(
            new AggregateTradeSubscription("bnbusdt"),
            new KlineSubscription("btcusdt", "1h"));

        await communicator.Start().ConfigureAwait(false);

        ExitEvent.WaitOne();

        disposable.Dispose();

        Log.CloseAndFlush();
    }
}
EN

回答 1

Stack Overflow用户

发布于 2021-12-04 16:24:43

我意识到为了在.Subscribe中显示异常,我必须将它包装在.Select/.SelectManytry catch块中。

代码语言:javascript
复制
client.Streams.AggregateTradesStream
    .Subscribe(response =>
    {
        try
        {
            Guard.Against.Null(response, nameof(response));
            Guard.Against.Null(response.Data, nameof(response.Data));

            var trade = response.Data;
            Log.Information($"Aggregated trade [{trade.Symbol}] [{trade.Side}] " +
                            $"Price: {trade.Price} Size: {trade.Quantity}");
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Exception while receiving message");
        }
    })
    .DisposeWith(disposable);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70210962

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档