首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >NetworkStream AsyncWrite速度

NetworkStream AsyncWrite速度
EN

Stack Overflow用户
提问于 2016-06-03 11:32:40
回答 2查看 1.3K关注 0票数 1

我正在学习异步套接字编程,为了一个更困难的项目,我想创建一个用于群聊的服务器。成功地做到了,但我不确定性能是否足够好,并认为我做错了什么。

基本上,我将400个用户连接到服务器上,然后从其中一个用户发送1000条消息(消息为1kB,前缀长度和rest为空)。服务器需要向所有400个用户广播每条消息。服务器上有NetworkStreams列表,当服务器收到消息时,它遍历列表并调用stream.WriteAsync方法。然而,似乎需要服务器40-50毫秒才能将该消息发送给所有400个用户。在测试过程中,服务器CPU使用率为4%,StressClient CPU使用率为55%。我以为它会比40到50毫秒快得多。我是做错什么了还是这个最大速度?

下面是服务器代码(最后两个方法是最相关的,ReceiveMessageAsync和SendToAllAsync)

代码语言:javascript
复制
private List<NetworkStream> connectedUsers = new List<NetworkStream>();
private int processedRequestsAmount = 0;
private Stopwatch sw = new Stopwatch();

public ServerEngine()
{
}

public void Start(IPAddress ipAddress, int port)
{
    TcpListener listener = new TcpListener(ipAddress, port);
    try
    {
        listener.Start();
        AcceptClientsAsync(listener);

        while (true)
        {
            Console.ReadKey(true);
            Console.WriteLine("Processed requests: " + processedRequestsAmount);
        }
    }
    finally
    {
        listener.Stop();
        Console.WriteLine("Server stopped! Press ENTER to close application...");
        Console.ReadLine();
    }
}

private async Task AcceptClientsAsync(TcpListener listener)
{
    while (true)
    {
        try
        {
            TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
            StartClientListenerAsync(client);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

private async Task StartClientListenerAsync(TcpClient client)
{
    using (client)
    {
        var buf = new byte[1024];
        NetworkStream stream = client.GetStream();
        lock (connectedUsers)
        {
            connectedUsers.Add(stream);
        }
        Console.WriteLine(connectedUsers.Count + " users connected!");

        while (true)
        {
            try
            {
                await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                break;
            }
        }

        connectedUsers.Remove(stream);
        Console.WriteLine("User disconnected.");
    }
}

private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
{
    int totalAmountRead = 0;

    // read header (length, 2 bytes total)
    while (totalAmountRead < 2)
    {
        totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
    }

    short totalLength = BitConverter.ToInt16(readBuffer, 0);

    // read rest of the message
    while (totalAmountRead < totalLength)
    {
        totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
    }

    await SendToAllAsync(readBuffer, totalLength);
}

private async Task SendToAllAsync(byte[] buffer, short totalLength)
{
    List<Task> tasks = new List<Task>(connectedUsers.Count);
    if (processedRequestsAmount == 0)
    {
        sw.Start();
    }

    foreach (NetworkStream stream in connectedUsers)
    {
        tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length)); 
    }

    await Task.WhenAll(tasks).ConfigureAwait(false);

    processedRequestsAmount++;
    if (processedRequestsAmount == 1000)
    {
        sw.Stop();
        Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
    }
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-06-04 21:36:34

当我运行服务器和ClientStressTest应用程序而不进行调试( Visual中的ctrl+F5)时,向400个用户发送消息只需要服务器5ms (CPU使用率约为30%),这比我所期望的要好得多。有人能向我解释一下为什么附加调试器会让事情这么慢吗?

无论如何,如果有人需要它来解决这个问题,下面是剩下的代码

ClientStressTest的Program.cs

代码语言:javascript
复制
class Program
{
    static int NumOfClients = 400;
    static int NumOfMessages = 1000;

    static NetworkStream[] Streams = new NetworkStream[NumOfClients];
    static byte[] Message = new byte[1024];

    static void Main(string[] args)
    {
        Buffer.BlockCopy(BitConverter.GetBytes((short)1024), 0, Message, 0, sizeof(short));
        Console.WriteLine("Press ENTER to run setup");
        Console.ReadLine();

        Setup().Wait();


        Console.WriteLine("Press ENTER to start sending");
        Console.ReadLine();


        NetworkStream sender = Streams[0];
        for (int i = 0; i < NumOfMessages; i++)
        {
            sender.WriteAsync(Message, 0, 1024);
        }

        Console.ReadLine();
    }

    static async Task Setup()
    {
        for (int i = 0; i < Streams.Length; i++)
        {
            TcpClient tcpClient = new TcpClient();
            tcpClient.Connect("localhost", 4000);
            NetworkStream stream = tcpClient.GetStream();
            Streams[i] = stream;
            Task.Run(() => CallbackListener(stream));
        }
    }

    static int counter = 0;
    static object objLock = new object();
    static async Task CallbackListener(NetworkStream stream)
    {
        var readBuffer = new byte[1024];
        int totalAmountRead;
        short totalLength;

        while (true)
        {
            totalAmountRead = 0;
            while (totalAmountRead < 2)
            {
                totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
            }

            totalLength = BitConverter.ToInt16(readBuffer, 0);

            while (totalAmountRead < totalLength)
            {
                totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
            }

            lock(objLock)
            {
                counter++;
                if (counter % 1000 == 0)
                {
                    // to see progress
                    Console.WriteLine(counter);
                }
            }
            // do nothing
        }
    }
}

服务器的Program.cs

代码语言:javascript
复制
class Program
{
    static void Main(string[] args)
    {
        var server = new ServerEngine();
        server.Start(IPAddress.Any, 4000);
    }
}

服务器的ServerEngine.cs

代码语言:javascript
复制
public class ServerEngine
{
    private List<NetworkStream> connectedUsers = new List<NetworkStream>();
    private int processedRequestsAmount = 0;
    private Stopwatch sw = new Stopwatch();

    public ServerEngine()
    {
    }

    public void Start(IPAddress ipAddress, int port)
    {
        TcpListener listener = new TcpListener(ipAddress, port);
        try
        {
            listener.Start();
            AcceptClientsAsync(listener);

            while (true)
            {
                Console.ReadKey(true);
                Console.WriteLine("Processed requests: " + processedRequestsAmount);
            }
        }
        finally
        {
            listener.Stop();
            Console.WriteLine("Server stopped! Press ENTER to close application...");
            Console.ReadLine();
        }
    }

    private async Task AcceptClientsAsync(TcpListener listener)
    {
        while (true)
        {
            try
            {
                TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
                StartClientListenerAsync(client);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }

    private async Task StartClientListenerAsync(TcpClient client)
    {
        using (client)
        {
            var buf = new byte[1024];
            NetworkStream stream = client.GetStream();
            lock (connectedUsers)
            {
                connectedUsers.Add(stream);
            }
            Console.WriteLine(connectedUsers.Count + " users connected!");

            while (true)
            {
                try
                {
                    await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
                }
                catch (Exception ex)
                {
                    break;
                }
            }

            connectedUsers.Remove(stream);
            Console.WriteLine("User disconnected.");
        }
    }

    private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
    {
        int totalAmountRead = 0;

        // read header (length, 2 bytes total)
        while (totalAmountRead < 2)
        {
            totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
        }

        short totalLength = BitConverter.ToInt16(readBuffer, 0);

        // read rest of the message
        while (totalAmountRead < totalLength)
        {
            totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
        }

        await SendToAll(readBuffer, totalLength).ConfigureAwait(false);
    }

    private async Task SendToAll(byte[] buffer, short totalLength)
    {
        List<Task> tasks = new List<Task>(connectedUsers.Count);
        if (processedRequestsAmount == 0)
        {
            sw.Start();
        }

        foreach (NetworkStream stream in connectedUsers)
        {
            tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
        }

        await Task.WhenAll(tasks).ConfigureAwait(false);

        processedRequestsAmount++;
        if (processedRequestsAmount == 1000)
        {
            sw.Stop();
            Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
        }
    }
}
票数 1
EN

Stack Overflow用户

发布于 2016-06-04 18:19:23

多一点-在SendToAllAsync中,您可以压制捕获ExecutionContext

代码语言:javascript
复制
  foreach (NetworkStream stream in connectedUsers)
    {
        ExecutionContext.SuppressFlow();
        tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length)); 
    }

请阅读https://msdn.microsoft.com/en-us/magazine/hh456402.aspx

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37613290

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档