假设我们有一个只有输出的流,它输出字节.流中的数据是序列化消息,每条消息总是以字节序列(0xAA, 0xBB, 0xCC)开头,但是消息的长度是未知的。
目前,我创建了一个可观测的字节,并在流上发出每个字节,然后订阅这个可观测的字节,缓冲每一个发射,找到字节序列,然后发出缓冲区。有点像
List<byte> buffer = new List<byte>();
dataStream.subscribe(b => {
buffer.add(b);
int[] idx = SearchSequence(buffer);
if(idx.Length < 2){
// TODO: wait for more data
}
else{
messageStream.onNext(buffer.GetRange(idx[0], idx[1]));
// TODO: remove them from buffer
}
})有没有更好的方法来解决这个问题?据我所知,有两个问题:
更新:
如何检测消息的结尾?
消息之间没有隔阂,消息就在彼此的旁边。因此,消息(0xAA, 0xBB, 0xCC)的起始序列也是前一条消息的结束序列。
你的输入是什么样子的?
我现在的代码是:
Observer<byte> ob = null;
var dataStream = Observable.Create<byte>(o => ob = o);
while(true){
ob.OnNext(ms.ReadByte());
}你希望你的输出能被观察到是什么样子?
可以观察到的发出信息的人
Observable<byte[]>发布于 2018-02-07 16:41:57
我不知道这有多优雅,但也许它会让你(或其他人)开始。我假设您希望将(0xAA, 0xBB, 0xCC)头从消息中排除在外:
var s = new Subject<byte>();
IObservable<byte[]> results = s.Publish(_s => Observable.When(_s
.And(_s.Skip(1))
.And(_s.Skip(2))
.Then((a, b, c) => (a, b, c))
))
.Publish(_t => _t
.Buffer(_t.Where(t => t.a == 0xAA && t.b == 0xBB && t.c == 0xCC))
.Select(l => (l[l.Count - 1].a == 0xAA && l[l.Count - 1].b == 0xBB && l[l.Count - 1].c == 0xCC
? l.Take(l.Count - 3)
: l
)
.Select(e => e.c)
.ToArray()
)
.Skip(1)
)
;解释:
我们首先使用And/Then/When进行双重压缩,因此流(0xAA, 0xBB, 0xCC, 0x01, 0x02, 0x03, 0xAA, 0xBB, 0xCC, 0x01, 0x02, 0x03)变成了如下所示的元组流:
(0xAA, 0xBB, 0xCC)
(0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)
(0x02, 0x03, 0xAA)
(0x03, 0xAA, 0xBB)
(0xAA, 0xBB, 0xCC)
(0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)然后,当您有一个类似于.Where的元组时,我们使用(0xAA, 0xBB, 0xCC)来嗅探,并使用它作为缓冲区边界。
一旦您有了缓冲区边界,它们实际上在消息启动程序发生后就被切断了,因此您最终得到了两个消息的示例流,最后得到了三个元组列表:
List 1: (0xAA, 0xBB, 0xCC)
List 2: (0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)
(0x02, 0x03, 0xAA)
(0x03, 0xAA, 0xBB)
(0xAA, 0xBB, 0xCC)
List 3: (0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)每个列表中我们想要的字节基本上是第三列,但是如果我们想从消息中排除消息--从消息中引入,那么我们必须进行一些清理:我们必须从列表2中排除最后三个元素(以及所有其他‘中间’列表),我们必须删除列表1,我们必须保留列表3。删除第一个列表是由.Skip(1)在最后完成的。从中间列表中去掉最后三个元素的方法是检查列表中的最后一个元素是否是(0xAA, 0xBB, 0xCC),如果是,则提取除最后三个元素之外的所有元素。
考虑到这些,我希望有更好的方法。
以下是一些跑步代码:
results.Dump(); //Linqpad
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0x01);
s.OnNext(0x02);
s.OnNext(0x03);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0x01);
s.OnNext(0x02);
s.OnNext(0x03);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0xCC);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0x04);
s.OnNext(0x05);
s.OnNext(0x06);
s.OnNext(0x07);
s.OnCompleted();输出:
01 02 03
01 02 03 AA BB
CC AA BB 04 05 06 07 发布于 2020-12-13 15:47:08
下面是一个定制的Window操作符,它根据所提供的分隔符数组将序列拆分为子序列。它基于接受Window参数的内置IObservable<TWindowBoundary> windowBoundaries运算符.
/// <summary>Projects each element of an observable sequence into non-overlapping
/// windows that are separated by the provided separator.</summary>
public static IObservable<IObservable<T>> Window<T>(
this IObservable<T> source,
T[] separator,
IEqualityComparer<T> comparer = null)
{
return Observable.Defer(() =>
{
var boundaries = new Subject<Unit>();
var queue = new Queue<T>(separator.Length);
return source
.Do(x =>
{
if (queue.Count == separator.Length) queue.Dequeue();
queue.Enqueue(x);
if (queue.SequenceEqual(separator, comparer))
{
queue.Clear();
boundaries.OnNext(default);
}
})
.Concat(Observable.Repeat(default(T), separator.Length - 1))
.SkipLast(separator.Length - 1)
.Window(boundaries)
.Select((window, i) => i == 0 ? window : window.Skip(separator.Length));
});
}Subject用于通知Window操作符已检测到新的边界。检测机制包括一个Queue,它保存最后发出的元素。每次发出新元素时,都会将此队列与separator进行比较。Window操作符故意落后于separator.Length - 1元素的检测机制,从而使结果窗口正确地对齐。
用法示例:
IObservable<byte> dataStream = GetDataStream();
IObservable<byte[]> messageStream = dataStream
.Window(new byte[] { 0xAA, 0xBB, 0xCC })
.SelectMany(window => window.ToArray());https://stackoverflow.com/questions/48654742
复制相似问题