首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用System.Reactive反序列化消息

使用System.Reactive反序列化消息
EN

Stack Overflow用户
提问于 2012-02-24 14:41:53
回答 2查看 305关注 0票数 2

我目前有一个程序,它侦听网络流,并在新消息被反序列化时触发事件。

代码语言:javascript
复制
while(true)
{
  byte[] lengthBytes = new byte[10];
  networkStream.Read(lengthBytes, 0, 10);
  int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes));
  var messageBytes = new byte[messageLength + 10];
  Array.Copy(lengthBytes, messageBytes, 10);
  int bytesReadTotal = 10;
  while (bytesReadTotal < 10 + messageLength)
    bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10);
  OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes)));
}

我想使用反应式扩展重写它,这样就有了IObservable<Message>而不是event。这可以使用以下命令来完成

代码语言:javascript
复制
Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
  (h) => NewMessage += h,
  (h) => NewMessage -= h)
    .Select(  (e) => { return e.Message; });

但是,我更喜欢使用System.Reactive重写侦听过程。我的出发点(来自here)是

代码语言:javascript
复制
Func<byte[], int, int, IObservable<int>> read;   
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead,
networkStream.EndRead);

这允许

代码语言:javascript
复制
byte[] lengthBytes = new byte[10];
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{
  (bytesRead) => ;
});

不过,我正在努力寻找如何继续下去的方法。有谁有实现的吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2012-02-24 18:25:02

我想出了以下方法,但我觉得不创建类和使用Subject<T> (例如,通过将头数据包投影到正文数据包到消息对象,但问题是EndRead()不返回字节数组,而是返回读取的字节数组)应该是可能的。所以你需要一个对象,或者至少在某些时候需要一个闭包)。

代码语言:javascript
复制
class Message
{
    public string Text { get; set; }
}

class MessageStream : IObservable<Message>
{
    private readonly Subject<Message> messages = new Subject<Message>();

    public void Start()
    {
        // Get your real network stream here.
        var stream  = Console.OpenStandardInput();
        GetNextMessage( stream );
    }

    private void GetNextMessage(Stream stream)
    {
        var header = new byte[10];
        var read = Observable.FromAsyncPattern<byte [], int, int, int>( stream.BeginRead, stream.EndRead );
        read( header, 0, 10 ).Subscribe( b =>
        {
            var bodyLength = BitConverter.ToInt32( header, 0 );
            var body = new byte[bodyLength];
            read( body, 0, bodyLength ).Subscribe( b2 =>
            {
                var message = new Message() {Text = Encoding.UTF8.GetString( body )};
                messages.OnNext( message );
                GetNextMessage( stream );
            } );
        } );
    }

    public IDisposable Subscribe( IObserver<Message> observer )
    {
        return messages.Subscribe( observer );
    }
}
票数 1
EN

Stack Overflow用户

发布于 2012-02-25 13:42:16

由于Observable.FromAsyncPattern只进行一次异步调用,因此您需要创建一个将多次调用它的函数。这应该会让你入门,但可能还有很大的改进空间。它假定您可以使用相同的参数重复进行异步调用,并假定selector将处理由此产生的任何问题。

代码语言:javascript
复制
Function FromRepeatedAsyncPattern(Of T1, T2, T3, TCallResult, TResult)(
             begin As Func(Of T1, T2, T3, AsyncCallback, Object, IAsyncResult),
             [end] As Func(Of IAsyncResult, TCallResult),
             selector As Func(Of TCallResult, TResult),
             isComplete As Func(Of TCallResult, Boolean)
            ) As Func(Of T1, T2, T3, IObservable(Of TResult))
    Return Function(a1, a2, a3) Observable.Create(Of TResult)(
        Function(obs)
            Dim serial As New SerialDisposable()
            Dim fac = Observable.FromAsyncPattern(begin, [end])
            Dim onNext As Action(Of TCallResult) = Nothing
            'this function will restart the subscription and will be
            'called every time a value is found
            Dim subscribe As Func(Of IDisposable) =
                Function()
                    'note that we are REUSING the arguments, the
                    'selector should handle this appropriately
                    Return fac(a1, a2, a3).Subscribe(onNext,
                                                     Sub(ex)
                                                         obs.OnError(ex)
                                                         serial.Dispose()
                                                     End Sub)
                End Function
            'set up the OnNext handler to restart the observer 
            'every time it completes
            onNext = Sub(v)
                         obs.OnNext(selector(v))
                         'subscriber disposed, do not check for completion
                         'or resubscribe
                         If serial.IsDisposed Then Exit Sub
                         If isComplete(v) Then
                             obs.OnCompleted()
                             serial.Dispose()
                         Else
                             'using the scheduler lets the OnNext complete before
                             'making the next async call.
                             'you could parameterize the scheduler, but it may not be
                             'helpful, and it won't work if Immediate is passed.
                             Scheduler.CurrentThread.Schedule(Sub() serial.Disposable = subscribe())
                         End If
                     End Sub
            'start the first subscription
            serial.Disposable = subscribe()
            Return serial
        End Function)
End Function

在这里,您可以获得如下所示的IObservable(Of Byte)

代码语言:javascript
复制
Dim buffer(4096 - 1) As Byte
Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
                 AddressOf stream.BeginRead, AddressOf stream.EndRead,
                 Function(numRead)
                     If numRead < 0 Then Throw New ArgumentException("Invalid number read")
                     Console.WriteLine("Position after read: " & stream.Position.ToString())
                     Dim ret(numRead - 1) As Byte
                     Array.Copy(buffer, ret, numRead)
                     Return ret
                 End Function,
                 Function(numRead) numRead <= 0)
'this will be an observable of the chunk size you specify
Dim obs = obsFac(buffer, 0, buffer.Length)

在那里,您将需要某种累加器函数,该函数接受字节数组并在找到字节数组时输出完整的消息。这样一个函数的框架可能如下所示:

代码语言:javascript
复制
Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message)
    Return Observable.Create(Of message)(
        Function(obs)
            Dim accumulator As New List(Of Byte)
            Return source.Subscribe(
                Sub(buffer)
                    'do some logic to build a packet here
                    accumulator.AddRange(buffer)
                    If True Then
                        obs.OnNext(New message())
                        'reset accumulator
                    End If
                End Sub,
                AddressOf obs.OnError,
                AddressOf obs.OnCompleted)
        End Function)
End Function
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/9426518

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档