首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >可观测网络IO解析

可观测网络IO解析
EN

Stack Overflow用户
提问于 2012-05-01 19:02:08
回答 2查看 753关注 0票数 7

我试图使用Rx从TCPClient接收流中读取数据,并将数据解析为字符串的IObservable,该字符串由换行符"\r\n“分隔,下面是从套接字流接收数据的方式。

代码语言:javascript
复制
var messages = new Subject<string>();

var functionReceiveSocketData =
            Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
            (client.Client.BeginReceive, client.Client.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
        {
            var rs = new byte[buffer.Length];
            bs.CopyTo(rs, 0);
            return rs;
        };

Observable
    .Defer(() =>
            {
                var buffer = new byte[50];
                return
                    from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
                select copy(buffer, n);
            }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));

下面是我解析字符串的方法。现在这不管用..。

代码语言:javascript
复制
obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

消息主题以块的形式接收消息,因此我尝试将它们连接起来,并测试连接的字符串是否包含换行符,从而向缓冲区发出关闭和输出缓冲块的信号。不知道为什么不起作用。似乎我只从obsStrings中得到了第一部分。

所以我要找两样东西。我想简化io流的读取,并消除消息主题的使用。其次,我想让我的字符串解析工作。我已经对此进行了一段时间的黑客攻击,无法找到一个可行的解决方案。我是Rx的初学者。

编辑:这是问题解决后的成品.

代码语言:javascript
复制
var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
            .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
            .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
            .Where(x => x.EndsWith("\r\n"))
            .Select(buffered => String.Join("", buffered))
            .Select(a => a.Replace("\n", ""));

"ReceiveUntilCompleted“是RXX项目的扩展。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2012-05-02 21:22:08

代码语言:javascript
复制
messages
    .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
    .Where(x => x.EndsWith("\r\n"))
票数 3
EN

Stack Overflow用户

发布于 2012-05-01 21:30:35

而不是使用SubscribeSubject,您可以尝试只使用Select吗?

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

现在假设这一切都进入了一个名为messages的新的可观察到的领域,那么您的下一个问题是在这一行中

代码语言:javascript
复制
var obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

您同时使用了BufferScan,并且试图在这两种情况下都做同样的事情!注意,Buffer需要一个关闭选择器。

你真正想要的是:

代码语言:javascript
复制
var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n")))
                         .Select(buffered => String.Join(buffered));

它为缓冲提供了一个关于何时关闭窗口(当它包含\r\n)的可观察性,并给出选择缓冲的数量以连接。这将产生一个新的可观察到的拆分字符串。

一个问题是,您仍然可以将新行放在块的中间,这将导致问题。一个简单的想法是观察字符,而不是完整的字符串块,例如:

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

然后可以执行messages.Where(c => c != '\r')跳过\r并将缓冲区更改为:

代码语言:javascript
复制
var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n')))
                         .Select(buffered => String.Join("", buffered));
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/10402770

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档