首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Rxx的Tcp时间服务器

使用Rxx的Tcp时间服务器
EN

Stack Overflow用户
提问于 2014-05-15 00:10:12
回答 1查看 68关注 0票数 0

我正在尝试使用Rxx创建一个TCP时间服务器。这个想法只是一个Tcp服务器,它每秒将服务器上的本地时间广播到每个连接的客户端。我可以连接到这个服务器,我看到'ticks‘可观察对象被订阅了,但是客户端没有接收到任何数据。这里我漏掉了什么?这是我为服务器准备的代码。

代码语言:javascript
复制
class Program
{
    static void Main(string[] args)
    {
        var ticks = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now.ToString())
            .Do(tick => Console.WriteLine("tick: {0}", tick))
            .Publish()
            .RefCount();

        IPEndPoint serverAddress = new IPEndPoint(IPAddress.Loopback, 15007);

        var listener = ObservableSocket.Accept(
                AddressFamily.InterNetwork,
                SocketType.Stream,
                ProtocolType.Tcp,
                serverAddress,
                20)
                .Do(s => Console.WriteLine("connection accepted {0}", s.RemoteEndPoint))
                .Select(s => new StreamWriter(new NetworkStream(s, true)));


        using (listener.Subscribe(
                client => ticks.Subscribe(
                    tick => client.WriteLineAsync(tick),
                    (tex) => Console.WriteLine("ticks error: {0}", tex.Message),
                    () => Console.WriteLine("ticks completed")
                ), 
                (ex) => Console.WriteLine("server error: {0}", ex.Message), 
                () => Console.WriteLine("server completed")
            )
        )
        {
            Console.WriteLine("Time server listening {0}", serverAddress);
            Console.WriteLine("Press ENTER to stop...");
            Console.ReadLine();
        }
    }
}
EN

回答 1

Stack Overflow用户

发布于 2014-05-16 03:50:57

这是我使用的一个基于none Rxx的解决方案,但它并不美观。希望Rxx专家仍然能向我展示一个更优雅的解决方案。

代码语言:javascript
复制
class Program
{
    static List<TcpClient> clients = new List<TcpClient>();

    static void Main(string[] args)
    {
        IPEndPoint serverAddress = new IPEndPoint(IPAddress.Loopback, 15007);

        using(Listen(serverAddress).Subscribe(client => { lock(clients) { clients.Add(client); }}))
        using (Ticks().Subscribe(
            tick => 
            {
                lock(clients)
                {
                    int i = 0;
                    while (i < clients.Count)
                    {
                        var client = clients[i];
                        try
                        {
                            using(var writer = new StreamWriter(client.GetStream(), Encoding.ASCII, client.Client.SendBufferSize, true))
                            {
                                writer.WriteLine(tick.ToString());
                                i++;
                            }
                        }
                        catch(Exception ex)
                        {
                            Console.WriteLine("exception: {0}", ex.Message);
                            clients.Remove(client);
                            Console.WriteLine("client disconnected");
                        }
                    }
                }
            })
        )
        {
            Console.WriteLine("Time server listening {0}", serverAddress);
            Console.WriteLine("Press ENTER to stop...");
            Console.ReadLine();
        }
    }

    static IObservable<TcpClient> Listen(IPEndPoint endpoint)
    {
        return Observable.Create<TcpClient>(
            observer =>
            {
                TcpListener listener = new TcpListener(endpoint);
                listener.Start();
                var subscription = Observable
                    .FromAsync(listener.AcceptTcpClientAsync)
                    .Retry()
                    .Repeat()
                    .Do(client => Console.WriteLine("connection accepted {0}", client.Client.RemoteEndPoint))
                    .Subscribe(observer);
                return new CompositeDisposable(subscription, 
                    Disposable.Create(() => listener.Stop()));
            })
            .Publish()
            .RefCount();
    }

    static IObservable<DateTime> Ticks()
    {
        return Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now)
            .Do(tick => Console.WriteLine("tick: {0}", tick))
            .Publish()
            .RefCount();
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23659881

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档