首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >异步AsyncTcpClient (异步TcpClient)

异步AsyncTcpClient (异步TcpClient)
EN

Code Review用户
提问于 2015-02-28 08:52:23
回答 2查看 24.4K关注 0票数 12

几年来,我一直使用C#'s TcpClient进行网络编程。下面的代码是我这些年来开发的TcpClient的异步包装器。

主要的方法是:

  • ConnectAsync() -异步连接;RemoteServerInfo是一个简单的类,包含主机、端口和指示这是否为SSL连接的布尔值。
  • StartReceiving() -启动数据读取回调;此方法是必要的,以便在数据开始处理之前有时间将事件连接起来。
  • DataReceivedCallback() --处理接收到的数据,并将其传递给任何订阅的事件处理程序。
  • SendAsync() -异步发送数据

还有一些需要注意的事情:

  • 代码使用旧的异步编程模型进行异步。
  • 缓冲区大小有一些问题。这样做的目的是具有自适应缓冲区大小--大部分时间使用少量内存,但在必要时扩展以更好地满足更大的传入数据(最多可达到指定的最大值)。
  • 我使用的是goto语句。这可能会让你的脊椎发冷,但我觉得这件事没问题。如果您对在任何情况下都不使用这个答案持宗教态度,请阅读goto

我非常希望其他开发人员(特别是网络程序员)对代码进行审查,看看这个实现是否可以进一步改进。想到的一些事情包括更好的性能、更好地使用水龙头而不是APM,以及我可能错过的任何可能的细微错误。

下面是AsyncTcpClient的代码:

代码语言:javascript
复制
public class AsyncTcpClient : IDisposable
{
    private bool disposed = false;
    private TcpClient tcpClient;
    private Stream stream;

    private int minBufferSize = 8192;
    private int maxBufferSize = 15 * 1024 * 1024;
    private int bufferSize = 8192;

    private int BufferSize
    {
        get
        {
            return this.bufferSize;
        }
        set
        {
            this.bufferSize = value;
            if (this.tcpClient != null)
                this.tcpClient.ReceiveBufferSize = value;
        }
    }

    public int MinBufferSize
    {
        get
        {
            return this.minBufferSize;
        }
        set
        {
            this.minBufferSize = value;
        }
    }

    public int MaxBufferSize
    {
        get
        {
            return this.maxBufferSize;
        }
        set
        {
            this.maxBufferSize = value;
        }
    }

    public int SendBufferSize
    {
        get
        {
            if (this.tcpClient != null)
                return this.tcpClient.SendBufferSize;
            else
                return 0;
        }
        set
        {
            if (this.tcpClient != null)
                this.tcpClient.SendBufferSize = value;
        }
    }

    public event EventHandler<byte[]> OnDataReceived;
    public event EventHandler OnDisconnected;

    public bool IsConnected
    {
        get
        {
            return this.tcpClient != null && this.tcpClient.Connected;
        }
    }

    public AsyncTcpClient()
    {

    }

    public async Task SendAsync(byte[] data)
    {
        try
        {
            await Task.Factory.FromAsync(this.stream.BeginWrite, this.stream.EndWrite, data, 0, data.Length, null);
            await this.stream.FlushAsync();
        }
        catch (IOException ex)
        {
            if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
                ; // ignore
            else if (this.OnDisconnected != null)
                this.OnDisconnected(this, null);
        }
    }

    public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationTokenSource cancellationTokenSource = null)
    {
        try
        {
            await Task.Run(() => this.tcpClient = new TcpClient());
            await Task.Factory.FromAsync(this.tcpClient.BeginConnect, this.tcpClient.EndConnect,
                remoteServerInfo.Host, remoteServerInfo.Port, null);

            // get stream and do SSL handshake if applicable

            this.stream = this.tcpClient.GetStream();

            if (remoteServerInfo.Ssl)
            {
                var sslStream = new SslStream(this.stream);
                sslStream.AuthenticateAsClient(remoteServerInfo.Host);
                this.stream = sslStream;
            }

            if (cancellationTokenSource != null && cancellationTokenSource.IsCancellationRequested)
            {
                this.Dispose();
                this.stream = null;
            }
        }
        catch(Exception)
        {
            // if task has been cancelled, then we don't care about the exception;
            // if it's still running, then the caller must receive the exception

            if (cancellationTokenSource == null || !cancellationTokenSource.IsCancellationRequested)
                throw;
        }
    }

    public void StartReceiving()
    {
        byte[] buffer = new byte[bufferSize];
        this.stream.BeginRead(buffer, 0, buffer.Length, DataReceivedCallback, buffer);
    }

    protected virtual void DataReceivedCallback(IAsyncResult asyncResult)
    {
        try
        {
            byte[] buffer = asyncResult.AsyncState as byte[];
            int bytesRead = this.stream.EndRead(asyncResult);

            if (bytesRead > 0)
            {
                // adapt buffer if it's too small / too large

                if (bytesRead == buffer.Length)
                    this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
                else
                {
                reduceBufferSize:
                    int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
                    if (bytesRead < reducedBufferSize)
                    {
                        this.BufferSize = reducedBufferSize;

                        if (bytesRead > this.minBufferSize)
                            goto reduceBufferSize;
                    }
                }

                // forward received data to subscriber

                if (this.OnDataReceived != null)
                {
                    byte[] data = new byte[bytesRead];
                    Array.Copy(buffer, data, bytesRead);
                    this.OnDataReceived(this, data);
                }

                // recurse

                byte[] newBuffer = new byte[bufferSize];
                this.stream.BeginRead(newBuffer, 0, newBuffer.Length, DataReceivedCallback, newBuffer);
            }
            else
                this.OnDisconnected(this, null);
        }
        catch(ObjectDisposedException) // can occur when closing, because tcpclient and stream are disposed
        {
            // ignore
        }
        catch(IOException ex)
        {
            if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
                ; // ignore
            else if (this.OnDisconnected != null)
                this.OnDisconnected(this, null);
        }
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                // Dispose managed resources.

                if (this.tcpClient != null)
                {
                    this.tcpClient.Close();
                    this.tcpClient = null;
                }
            }

            // There are no unmanaged resources to release, but
            // if we add them, they need to be released here.
        }

        disposed = true;

        // If it is available, make the call to the
        // base class's Dispose(Boolean) method
        // base.Dispose(disposing);
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
}
EN

回答 2

Code Review用户

回答已采纳

发布于 2015-02-28 10:00:48

您应该只使用Task.Run启动线程,至少在执行IO时不希望这样做。您应该让运行时做出这个决定。此外,您还需要确保您的tcpClient尚未连接。还有一个tcpClient.ConnectAsync可以还原一个任务,所以您应该使用它。

此外,您也不应该将CancellationTokenSource传递给异步方法,使用CancellationToken。.NET异步库被设计为在取消时抛出一个OperationCanceledException,因此任务被标记为取消,所以请使用它。从内部释放类也是不好的做法,这可能会产生一些非常没有关联的效果,只需关闭并处理tcpListener

因此,ConnectAsync看起来可能是:

代码语言:javascript
复制
private async Task Close()
{
   await Task.Yield();
   if(this.tcpClient != null)
   {

      this.tcpClient.Close();
      this.tcpClient.Dispose();
      this.tcpClient = null;
   }
   if(this.stream != null)
   {
       this.stream.Dispose();
       this.stream = null;
   }
}
private async Task CloseIfCanceled(CancellationTeken token, Action onClosed = null)
{
    if(token.IsCancellationRequested)
    {
        await this.Close();
        if(onClosed != null)
           onClosed();
        token.ThrowIfCancellationRequested();
    }
}
public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationToken cancellationToken = default(CancellationToken))
{
    try
    {
        //Connect async method
        await this.Close();
        cancellationToken.ThrowIfCancellationRequested();
        this.tcpClient = new TcpClient();
        canellationToken.ThrowIfCancellationRequested();
        await this.tcpClient.ConnectAsync(remoteServerInfo.Host, remoteServerInfo.Port);
        await this.CloseIfCanceled(cancelationToken);
        // get stream and do SSL handshake if applicable

        this.stream = this.tcpClient.GetStream();
        await this.CloseIfCanceled(cancelationToken);
        if (remoteServerInfo.Ssl)
        {
            var sslStream = new SslStream(this.stream);
            sslStream.AuthenticateAsClient(remoteServerInfo.Host);
            this.stream = sslStream;
            await this.CloseIfCanceled(cancelationToken);
        }
    }
    catch(Exception)
    {
        this.CloseIfCanceled(cancelationToken).Wait();
        throw;
    }
}

流上也有异步方法返回任务所以,您的OnDisconected事件也不是线程安全的,您需要将它分配给一个内部变量。您也不应该传递空EventArgs。此外,您还可以简化BeginRecieve,使其只接收并放入包含异步/等待和取消令牌的循环中。另外,我会删除goto并用do/while替换(顺便说一句,我不能100%确定减少缓冲区大小的逻辑可以工作,其他人可能想解决这个问题)

代码语言:javascript
复制
public async Task SendAsync(byte[] data, CancelationToken token = default(CancellationToken))
{
    try
    {
        await this.stream.WriteAsync(data, 0, data.Length, token);
        await this.stream.FlushAsync(token);
    }
    catch (IOException ex)
    {
         var onDisconected = this.OnDisconected;
         if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
            ; // ignore
        else if (onDisconected != null)
            onDisconected(this, EventArgs.Empty);
    }
}
public async Task Recieve(CancelationToken token = default(CancellationToken))
{
   try
   {
         if(!this.IsConnected || this.IsRecieving)
             throw new InvalidOperationException();
         this.IsRecieving = true;
         byte[] buffer = new byte[bufferSize];
         while(this.IsConnected)
         {
                  token.ThrowIfCancellationRequested();
                  int bytesRead = await this.stream.ReadAsync(buffer, 0, buffer.Length, token);
                  if(bytesRead > 0)
                  {
                       if(bytesRead == buffer.Length)
                          this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
                  else
                  {
                     do
                     {
                        int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
                        if(bytesRead < reducedBufferSize)
                           this.BufferSize = reducedBufferSize;

                     }
                     while(bytesRead > this.minBufferSize)
                  }
                  var onDataRecieved = this.OnDataRecieved;
                  if(onDataRecieved != null)
                  {
                     byte[] data = new byte[bytesRead];
                     Array.Copy(buffer, data, bytesRead);
                     onDataRecieved(this, data);
                  }
              }
              buffer = new byte[bufferSize];
         }
   }
   catch(ObjectDisposedException){}
   catch(IOException ex)
   {
       var evt = this.OnDisconnected; 
       if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
            ;
       if(evt != null)
            evt(this, EventArgs.Empty);
   }
   finally
   {
      this.IsRecieving = false;
   }
}
票数 9
EN

Code Review用户

发布于 2015-03-01 02:07:37

公共int MinBufferSize { get {value this.minBufferSize;} set { this.minBufferSize = value;} public int MaxBufferSize { get {value this.maxBufferSize;} set { this.maxBufferSize = value;}}

有什么理由不使用自动实现的属性吗?

代码语言:javascript
复制
public int MinBufferSize { get; set; }

public int MaxBufferSize { get; set; }

如果(ex.InnerException != null && ex.InnerException为ObjectDisposedException) //用于SSL流

你不需要检查null

代码语言:javascript
复制
if (ex.InnerException is ObjectDisposedException) // for SSL streams

来自MSDN

如果提供的表达式为非空,则is表达式的计算结果为true,并且可以将提供的对象转换为提供的类型,而不会引发异常。

票数 4
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/82806

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档