几年来,我一直使用C#'s TcpClient进行网络编程。下面的代码是我这些年来开发的TcpClient的异步包装器。
主要的方法是:
ConnectAsync() -异步连接;RemoteServerInfo是一个简单的类,包含主机、端口和指示这是否为SSL连接的布尔值。StartReceiving() -启动数据读取回调;此方法是必要的,以便在数据开始处理之前有时间将事件连接起来。DataReceivedCallback() --处理接收到的数据,并将其传递给任何订阅的事件处理程序。SendAsync() -异步发送数据还有一些需要注意的事情:
goto语句。这可能会让你的脊椎发冷,但我觉得这件事没问题。如果您对在任何情况下都不使用这个答案持宗教态度,请阅读goto。我非常希望其他开发人员(特别是网络程序员)对代码进行审查,看看这个实现是否可以进一步改进。想到的一些事情包括更好的性能、更好地使用水龙头而不是APM,以及我可能错过的任何可能的细微错误。
下面是AsyncTcpClient的代码:
public class AsyncTcpClient : IDisposable
{
private bool disposed = false;
private TcpClient tcpClient;
private Stream stream;
private int minBufferSize = 8192;
private int maxBufferSize = 15 * 1024 * 1024;
private int bufferSize = 8192;
private int BufferSize
{
get
{
return this.bufferSize;
}
set
{
this.bufferSize = value;
if (this.tcpClient != null)
this.tcpClient.ReceiveBufferSize = value;
}
}
public int MinBufferSize
{
get
{
return this.minBufferSize;
}
set
{
this.minBufferSize = value;
}
}
public int MaxBufferSize
{
get
{
return this.maxBufferSize;
}
set
{
this.maxBufferSize = value;
}
}
public int SendBufferSize
{
get
{
if (this.tcpClient != null)
return this.tcpClient.SendBufferSize;
else
return 0;
}
set
{
if (this.tcpClient != null)
this.tcpClient.SendBufferSize = value;
}
}
public event EventHandler<byte[]> OnDataReceived;
public event EventHandler OnDisconnected;
public bool IsConnected
{
get
{
return this.tcpClient != null && this.tcpClient.Connected;
}
}
public AsyncTcpClient()
{
}
public async Task SendAsync(byte[] data)
{
try
{
await Task.Factory.FromAsync(this.stream.BeginWrite, this.stream.EndWrite, data, 0, data.Length, null);
await this.stream.FlushAsync();
}
catch (IOException ex)
{
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
; // ignore
else if (this.OnDisconnected != null)
this.OnDisconnected(this, null);
}
}
public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationTokenSource cancellationTokenSource = null)
{
try
{
await Task.Run(() => this.tcpClient = new TcpClient());
await Task.Factory.FromAsync(this.tcpClient.BeginConnect, this.tcpClient.EndConnect,
remoteServerInfo.Host, remoteServerInfo.Port, null);
// get stream and do SSL handshake if applicable
this.stream = this.tcpClient.GetStream();
if (remoteServerInfo.Ssl)
{
var sslStream = new SslStream(this.stream);
sslStream.AuthenticateAsClient(remoteServerInfo.Host);
this.stream = sslStream;
}
if (cancellationTokenSource != null && cancellationTokenSource.IsCancellationRequested)
{
this.Dispose();
this.stream = null;
}
}
catch(Exception)
{
// if task has been cancelled, then we don't care about the exception;
// if it's still running, then the caller must receive the exception
if (cancellationTokenSource == null || !cancellationTokenSource.IsCancellationRequested)
throw;
}
}
public void StartReceiving()
{
byte[] buffer = new byte[bufferSize];
this.stream.BeginRead(buffer, 0, buffer.Length, DataReceivedCallback, buffer);
}
protected virtual void DataReceivedCallback(IAsyncResult asyncResult)
{
try
{
byte[] buffer = asyncResult.AsyncState as byte[];
int bytesRead = this.stream.EndRead(asyncResult);
if (bytesRead > 0)
{
// adapt buffer if it's too small / too large
if (bytesRead == buffer.Length)
this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
else
{
reduceBufferSize:
int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
if (bytesRead < reducedBufferSize)
{
this.BufferSize = reducedBufferSize;
if (bytesRead > this.minBufferSize)
goto reduceBufferSize;
}
}
// forward received data to subscriber
if (this.OnDataReceived != null)
{
byte[] data = new byte[bytesRead];
Array.Copy(buffer, data, bytesRead);
this.OnDataReceived(this, data);
}
// recurse
byte[] newBuffer = new byte[bufferSize];
this.stream.BeginRead(newBuffer, 0, newBuffer.Length, DataReceivedCallback, newBuffer);
}
else
this.OnDisconnected(this, null);
}
catch(ObjectDisposedException) // can occur when closing, because tcpclient and stream are disposed
{
// ignore
}
catch(IOException ex)
{
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
; // ignore
else if (this.OnDisconnected != null)
this.OnDisconnected(this, null);
}
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
// Dispose managed resources.
if (this.tcpClient != null)
{
this.tcpClient.Close();
this.tcpClient = null;
}
}
// There are no unmanaged resources to release, but
// if we add them, they need to be released here.
}
disposed = true;
// If it is available, make the call to the
// base class's Dispose(Boolean) method
// base.Dispose(disposing);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}发布于 2015-02-28 10:00:48
您应该只使用Task.Run启动线程,至少在执行IO时不希望这样做。您应该让运行时做出这个决定。此外,您还需要确保您的tcpClient尚未连接。还有一个tcpClient.ConnectAsync可以还原一个任务,所以您应该使用它。
此外,您也不应该将CancellationTokenSource传递给异步方法,使用CancellationToken。.NET异步库被设计为在取消时抛出一个OperationCanceledException,因此任务被标记为取消,所以请使用它。从内部释放类也是不好的做法,这可能会产生一些非常没有关联的效果,只需关闭并处理tcpListener
因此,ConnectAsync看起来可能是:
private async Task Close()
{
await Task.Yield();
if(this.tcpClient != null)
{
this.tcpClient.Close();
this.tcpClient.Dispose();
this.tcpClient = null;
}
if(this.stream != null)
{
this.stream.Dispose();
this.stream = null;
}
}
private async Task CloseIfCanceled(CancellationTeken token, Action onClosed = null)
{
if(token.IsCancellationRequested)
{
await this.Close();
if(onClosed != null)
onClosed();
token.ThrowIfCancellationRequested();
}
}
public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
//Connect async method
await this.Close();
cancellationToken.ThrowIfCancellationRequested();
this.tcpClient = new TcpClient();
canellationToken.ThrowIfCancellationRequested();
await this.tcpClient.ConnectAsync(remoteServerInfo.Host, remoteServerInfo.Port);
await this.CloseIfCanceled(cancelationToken);
// get stream and do SSL handshake if applicable
this.stream = this.tcpClient.GetStream();
await this.CloseIfCanceled(cancelationToken);
if (remoteServerInfo.Ssl)
{
var sslStream = new SslStream(this.stream);
sslStream.AuthenticateAsClient(remoteServerInfo.Host);
this.stream = sslStream;
await this.CloseIfCanceled(cancelationToken);
}
}
catch(Exception)
{
this.CloseIfCanceled(cancelationToken).Wait();
throw;
}
}流上也有异步方法返回任务所以,您的OnDisconected事件也不是线程安全的,您需要将它分配给一个内部变量。您也不应该传递空EventArgs。此外,您还可以简化BeginRecieve,使其只接收并放入包含异步/等待和取消令牌的循环中。另外,我会删除goto并用do/while替换(顺便说一句,我不能100%确定减少缓冲区大小的逻辑可以工作,其他人可能想解决这个问题)
public async Task SendAsync(byte[] data, CancelationToken token = default(CancellationToken))
{
try
{
await this.stream.WriteAsync(data, 0, data.Length, token);
await this.stream.FlushAsync(token);
}
catch (IOException ex)
{
var onDisconected = this.OnDisconected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
; // ignore
else if (onDisconected != null)
onDisconected(this, EventArgs.Empty);
}
}
public async Task Recieve(CancelationToken token = default(CancellationToken))
{
try
{
if(!this.IsConnected || this.IsRecieving)
throw new InvalidOperationException();
this.IsRecieving = true;
byte[] buffer = new byte[bufferSize];
while(this.IsConnected)
{
token.ThrowIfCancellationRequested();
int bytesRead = await this.stream.ReadAsync(buffer, 0, buffer.Length, token);
if(bytesRead > 0)
{
if(bytesRead == buffer.Length)
this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
else
{
do
{
int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
if(bytesRead < reducedBufferSize)
this.BufferSize = reducedBufferSize;
}
while(bytesRead > this.minBufferSize)
}
var onDataRecieved = this.OnDataRecieved;
if(onDataRecieved != null)
{
byte[] data = new byte[bytesRead];
Array.Copy(buffer, data, bytesRead);
onDataRecieved(this, data);
}
}
buffer = new byte[bufferSize];
}
}
catch(ObjectDisposedException){}
catch(IOException ex)
{
var evt = this.OnDisconnected;
if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
;
if(evt != null)
evt(this, EventArgs.Empty);
}
finally
{
this.IsRecieving = false;
}
}发布于 2015-03-01 02:07:37
公共int MinBufferSize { get {value this.minBufferSize;} set { this.minBufferSize = value;} public int MaxBufferSize { get {value this.maxBufferSize;} set { this.maxBufferSize = value;}}
有什么理由不使用自动实现的属性吗?
public int MinBufferSize { get; set; }
public int MaxBufferSize { get; set; }如果(ex.InnerException != null && ex.InnerException为ObjectDisposedException) //用于SSL流
你不需要检查null
if (ex.InnerException is ObjectDisposedException) // for SSL streams来自MSDN:
如果提供的表达式为非空,则is表达式的计算结果为true,并且可以将提供的对象转换为提供的类型,而不会引发异常。
https://codereview.stackexchange.com/questions/82806
复制相似问题