我目前有一个程序,它侦听网络流,并在新消息被反序列化时触发事件。
while(true)
{
byte[] lengthBytes = new byte[10];
networkStream.Read(lengthBytes, 0, 10);
int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes));
var messageBytes = new byte[messageLength + 10];
Array.Copy(lengthBytes, messageBytes, 10);
int bytesReadTotal = 10;
while (bytesReadTotal < 10 + messageLength)
bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10);
OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes)));
}我想使用反应式扩展重写它,这样就有了IObservable<Message>而不是event。这可以使用以下命令来完成
Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
(h) => NewMessage += h,
(h) => NewMessage -= h)
.Select( (e) => { return e.Message; });但是,我更喜欢使用System.Reactive重写侦听过程。我的出发点(来自here)是
Func<byte[], int, int, IObservable<int>> read;
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead,
networkStream.EndRead);这允许
byte[] lengthBytes = new byte[10];
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{
(bytesRead) => ;
});不过,我正在努力寻找如何继续下去的方法。有谁有实现的吗?
发布于 2012-02-24 18:25:02
我想出了以下方法,但我觉得不创建类和使用Subject<T> (例如,通过将头数据包投影到正文数据包到消息对象,但问题是EndRead()不返回字节数组,而是返回读取的字节数组)应该是可能的。所以你需要一个对象,或者至少在某些时候需要一个闭包)。
class Message
{
public string Text { get; set; }
}
class MessageStream : IObservable<Message>
{
private readonly Subject<Message> messages = new Subject<Message>();
public void Start()
{
// Get your real network stream here.
var stream = Console.OpenStandardInput();
GetNextMessage( stream );
}
private void GetNextMessage(Stream stream)
{
var header = new byte[10];
var read = Observable.FromAsyncPattern<byte [], int, int, int>( stream.BeginRead, stream.EndRead );
read( header, 0, 10 ).Subscribe( b =>
{
var bodyLength = BitConverter.ToInt32( header, 0 );
var body = new byte[bodyLength];
read( body, 0, bodyLength ).Subscribe( b2 =>
{
var message = new Message() {Text = Encoding.UTF8.GetString( body )};
messages.OnNext( message );
GetNextMessage( stream );
} );
} );
}
public IDisposable Subscribe( IObserver<Message> observer )
{
return messages.Subscribe( observer );
}
}发布于 2012-02-25 13:42:16
由于Observable.FromAsyncPattern只进行一次异步调用,因此您需要创建一个将多次调用它的函数。这应该会让你入门,但可能还有很大的改进空间。它假定您可以使用相同的参数重复进行异步调用,并假定selector将处理由此产生的任何问题。
Function FromRepeatedAsyncPattern(Of T1, T2, T3, TCallResult, TResult)(
begin As Func(Of T1, T2, T3, AsyncCallback, Object, IAsyncResult),
[end] As Func(Of IAsyncResult, TCallResult),
selector As Func(Of TCallResult, TResult),
isComplete As Func(Of TCallResult, Boolean)
) As Func(Of T1, T2, T3, IObservable(Of TResult))
Return Function(a1, a2, a3) Observable.Create(Of TResult)(
Function(obs)
Dim serial As New SerialDisposable()
Dim fac = Observable.FromAsyncPattern(begin, [end])
Dim onNext As Action(Of TCallResult) = Nothing
'this function will restart the subscription and will be
'called every time a value is found
Dim subscribe As Func(Of IDisposable) =
Function()
'note that we are REUSING the arguments, the
'selector should handle this appropriately
Return fac(a1, a2, a3).Subscribe(onNext,
Sub(ex)
obs.OnError(ex)
serial.Dispose()
End Sub)
End Function
'set up the OnNext handler to restart the observer
'every time it completes
onNext = Sub(v)
obs.OnNext(selector(v))
'subscriber disposed, do not check for completion
'or resubscribe
If serial.IsDisposed Then Exit Sub
If isComplete(v) Then
obs.OnCompleted()
serial.Dispose()
Else
'using the scheduler lets the OnNext complete before
'making the next async call.
'you could parameterize the scheduler, but it may not be
'helpful, and it won't work if Immediate is passed.
Scheduler.CurrentThread.Schedule(Sub() serial.Disposable = subscribe())
End If
End Sub
'start the first subscription
serial.Disposable = subscribe()
Return serial
End Function)
End Function在这里,您可以获得如下所示的IObservable(Of Byte):
Dim buffer(4096 - 1) As Byte
Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
AddressOf stream.BeginRead, AddressOf stream.EndRead,
Function(numRead)
If numRead < 0 Then Throw New ArgumentException("Invalid number read")
Console.WriteLine("Position after read: " & stream.Position.ToString())
Dim ret(numRead - 1) As Byte
Array.Copy(buffer, ret, numRead)
Return ret
End Function,
Function(numRead) numRead <= 0)
'this will be an observable of the chunk size you specify
Dim obs = obsFac(buffer, 0, buffer.Length)在那里,您将需要某种累加器函数,该函数接受字节数组并在找到字节数组时输出完整的消息。这样一个函数的框架可能如下所示:
Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message)
Return Observable.Create(Of message)(
Function(obs)
Dim accumulator As New List(Of Byte)
Return source.Subscribe(
Sub(buffer)
'do some logic to build a packet here
accumulator.AddRange(buffer)
If True Then
obs.OnNext(New message())
'reset accumulator
End If
End Sub,
AddressOf obs.OnError,
AddressOf obs.OnCompleted)
End Function)
End Functionhttps://stackoverflow.com/questions/9426518
复制相似问题