首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何以反应性的方式将字节分组为消息

如何以反应性的方式将字节分组为消息
EN

Stack Overflow用户
提问于 2018-02-07 01:47:06
回答 2查看 136关注 0票数 4

假设我们有一个只有输出的流,它输出字节.流中的数据是序列化消息,每条消息总是以字节序列(0xAA, 0xBB, 0xCC)开头,但是消息的长度是未知的。

目前,我创建了一个可观测的字节,并在流上发出每个字节,然后订阅这个可观测的字节,缓冲每一个发射,找到字节序列,然后发出缓冲区。有点像

代码语言:javascript
复制
List<byte> buffer = new List<byte>();
dataStream.subscribe(b => {
    buffer.add(b);
    int[] idx = SearchSequence(buffer);
    if(idx.Length < 2){
        // TODO: wait for more data
    }
    else{
        messageStream.onNext(buffer.GetRange(idx[0], idx[1]));
        // TODO: remove them from buffer
    }
})

有没有更好的方法来解决这个问题?据我所知,有两个问题:

  1. 消息的长度不是固定的,这会使Observable.buffer()失效
  2. 当订阅dataStream (或其他可观察到的消息)时,流的输出可能位于消息的中间。

更新:

如何检测消息的结尾?

消息之间没有隔阂,消息就在彼此的旁边。因此,消息(0xAA, 0xBB, 0xCC)的起始序列也是前一条消息的结束序列。

你的输入是什么样子的?

我现在的代码是:

代码语言:javascript
复制
Observer<byte> ob = null;
var dataStream = Observable.Create<byte>(o => ob = o);
while(true){
    ob.OnNext(ms.ReadByte());
}

你希望你的输出能被观察到是什么样子?

可以观察到的发出信息的人

代码语言:javascript
复制
Observable<byte[]>
EN

回答 2

Stack Overflow用户

发布于 2018-02-07 16:41:57

我不知道这有多优雅,但也许它会让你(或其他人)开始。我假设您希望将(0xAA, 0xBB, 0xCC)头从消息中排除在外:

代码语言:javascript
复制
var s = new Subject<byte>();

IObservable<byte[]> results = s.Publish(_s => Observable.When(_s
        .And(_s.Skip(1))
        .And(_s.Skip(2))
        .Then((a, b, c) => (a, b, c))
    ))
    .Publish(_t => _t
        .Buffer(_t.Where(t => t.a == 0xAA && t.b == 0xBB && t.c == 0xCC))
        .Select(l => (l[l.Count - 1].a == 0xAA && l[l.Count - 1].b == 0xBB && l[l.Count - 1].c == 0xCC
                ? l.Take(l.Count - 3)
                : l
            )
            .Select(e => e.c)
            .ToArray()
        )
        .Skip(1)
    )
;

解释:

我们首先使用And/Then/When进行双重压缩,因此流(0xAA, 0xBB, 0xCC, 0x01, 0x02, 0x03, 0xAA, 0xBB, 0xCC, 0x01, 0x02, 0x03)变成了如下所示的元组流:

代码语言:javascript
复制
(0xAA, 0xBB, 0xCC)
(0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)
(0x02, 0x03, 0xAA)
(0x03, 0xAA, 0xBB)
(0xAA, 0xBB, 0xCC)
(0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)

然后,当您有一个类似于.Where的元组时,我们使用(0xAA, 0xBB, 0xCC)来嗅探,并使用它作为缓冲区边界。

一旦您有了缓冲区边界,它们实际上在消息启动程序发生后就被切断了,因此您最终得到了两个消息的示例流,最后得到了三个元组列表:

代码语言:javascript
复制
List 1: (0xAA, 0xBB, 0xCC)

List 2: (0xBB, 0xCC, 0x01)
        (0xCC, 0x01, 0x02)
        (0x01, 0x02, 0x03)
        (0x02, 0x03, 0xAA)
        (0x03, 0xAA, 0xBB)
        (0xAA, 0xBB, 0xCC)

List 3: (0xBB, 0xCC, 0x01)
        (0xCC, 0x01, 0x02)
        (0x01, 0x02, 0x03)

每个列表中我们想要的字节基本上是第三列,但是如果我们想从消息中排除消息--从消息中引入,那么我们必须进行一些清理:我们必须从列表2中排除最后三个元素(以及所有其他‘中间’列表),我们必须删除列表1,我们必须保留列表3。删除第一个列表是由.Skip(1)在最后完成的。从中间列表中去掉最后三个元素的方法是检查列表中的最后一个元素是否是(0xAA, 0xBB, 0xCC),如果是,则提取除最后三个元素之外的所有元素。

考虑到这些,我希望有更好的方法。

以下是一些跑步代码:

代码语言:javascript
复制
results.Dump(); //Linqpad

s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0x01);
s.OnNext(0x02);
s.OnNext(0x03);

s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);

s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0x01);
s.OnNext(0x02);
s.OnNext(0x03);
s.OnNext(0xAA);
s.OnNext(0xBB);

s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0xCC);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0x04);
s.OnNext(0x05);
s.OnNext(0x06);
s.OnNext(0x07);
s.OnCompleted();

输出:

代码语言:javascript
复制
01 02 03 

01 02 03 AA BB 
CC AA BB 04 05 06 07 
票数 4
EN

Stack Overflow用户

发布于 2020-12-13 15:47:08

下面是一个定制的Window操作符,它根据所提供的分隔符数组将序列拆分为子序列。它基于接受Window参数的内置IObservable<TWindowBoundary> windowBoundaries运算符.

代码语言:javascript
复制
/// <summary>Projects each element of an observable sequence into non-overlapping
/// windows that are separated by the provided separator.</summary>
public static IObservable<IObservable<T>> Window<T>(
    this IObservable<T> source,
    T[] separator,
    IEqualityComparer<T> comparer = null)
{
    return Observable.Defer(() =>
    {
        var boundaries = new Subject<Unit>();
        var queue = new Queue<T>(separator.Length);
        return source
            .Do(x =>
            {
                if (queue.Count == separator.Length) queue.Dequeue();
                queue.Enqueue(x);
                if (queue.SequenceEqual(separator, comparer))
                {
                    queue.Clear();
                    boundaries.OnNext(default);
                }
            })
            .Concat(Observable.Repeat(default(T), separator.Length - 1))
            .SkipLast(separator.Length - 1)
            .Window(boundaries)
            .Select((window, i) => i == 0 ? window : window.Skip(separator.Length));
    });
}

Subject用于通知Window操作符已检测到新的边界。检测机制包括一个Queue,它保存最后发出的元素。每次发出新元素时,都会将此队列与separator进行比较。Window操作符故意落后于separator.Length - 1元素的检测机制,从而使结果窗口正确地对齐。

用法示例:

代码语言:javascript
复制
IObservable<byte> dataStream = GetDataStream();

IObservable<byte[]> messageStream = dataStream
    .Window(new byte[] { 0xAA, 0xBB, 0xCC })
    .SelectMany(window => window.ToArray());
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48654742

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档