首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >IObservable与NetMQ接收

IObservable与NetMQ接收
EN

Stack Overflow用户
提问于 2015-03-15 07:51:31
回答 2查看 1.5K关注 0票数 3

我正在尝试编写一个典型的股票交易程序,它接收来自netmq的股票代码/订单/交易,将流转换为IObservable,并在WPF前端显示它们。我尝试使用NetMQ阻塞ReceiveString (假设我期望得到一些字符串输入)来使用异步/等待,这样ReceiveString循环就不会阻塞主(UI)线程。由于我对C#还不熟悉,所以我在这篇文章中接受了Dave的回答:(https://social.msdn.microsoft.com/Forums/en-US/b0cf96b0-d23e-4461-9d2b-ca989be678dc/where-is-iasyncenumerable-in-the-lastest-release?forum=rx),并试图写一些这样的例子:

代码语言:javascript
复制
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using NetMQ;
using NetMQ.Sockets;
using System.Reactive;
using System.Reactive.Linq;

namespace App1
{
    class MainClass
    {
        // publisher for testing, should be an external data publisher in real environment
        public static Thread StartPublisher(PublisherSocket s)
        {
            s.Bind("inproc://test");
            var thr = new Thread(() => {
                Console.WriteLine("Start publishing...");
                while (true) {
                    Thread.Sleep(500);
                    s.Send("hello");
                }
            });
            thr.Start();
            return thr;
        }

        public static IObservable<string> Receive(SubscriberSocket s)
        {
            s.Connect("inproc://test");
            s.Subscribe("");
            return Observable.Create<string>(
                async observer =>
                {
                    while (true)
                    {
                        var result = await s.ReceiveString();
                        observer.OnNext(result);
                    }
                });
        }

        public static void Main(string[] args)
        {
            var ctx = NetMQContext.Create();
            var sub = ctx.CreateSubscriberSocket();
            var pub = ctx.CreatePublisherSocket();
            StartPublisher(pub);

            Receive(sub).Subscribe(Console.WriteLine);
            Console.ReadLine();
        }
    }
}

它无法用“无法等待字符串”进行编译。虽然我知道这可能是一项任务,但我不太清楚如何把整件事搞清楚。

再次包装:我试图实现的只是使用简单的阻塞apis从netmq获取代码/订单/交易的IObservable流,而不是真正阻塞主线程。

我能用它做什么吗?非常感谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-03-16 00:47:25

我不熟悉NetMQ,但您确实应该像这样构建可观察到的结构:

代码语言:javascript
复制
    public static IObservable<string> Receive(NetMQContext ctx)
    {
        return Observable
            .Create<string>(o =>
                Observable.Using<string, SubscriberSocket>(() =>
                {
                    var sub = ctx.CreateSubscriberSocket();
                    sub.Connect("inproc://test");
                    sub.Subscribe("");
                    return sub;
                }, sub =>
                Observable
                    .FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
                        h => sub.ReceiveReady += h,
                        h => sub.ReceiveReady -= h)
                    .Select(x => sub.ReceiveString()))
            .Subscribe(o));
    }

这将自动为您创建一个SubscriberSocket,当可观察的结束时,.Dispose()将自动在套接字上被调用。

就像我说的,我不熟悉NetMQ,所以上面的代码没有收到任何已发布的消息,所以您需要修改它才能让它正常工作,但这是一个很好的起点。

票数 6
EN

Stack Overflow用户

发布于 2015-03-15 08:40:59

ReceiveString是不可接受的,所以只需删除等待。

您还可以在这里阅读如何制作一个可访问的套接字:http://somdoron.com/2014/08/netmq-asp-net/

并查看以下关于在RX中使用NetMQ的文章:

http://www.codeproject.com/Articles/853841/NetMQ-plus-RX-Streaming-Data-Demo-App

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29058437

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档