我一直在努力让SocketAsyncEventArgs按照我想要的方式工作。现在我在想:这是否会起作用呢?
/// <summary>
/// The settings to use with this ServerSocket.
/// </summary>
ServerSocketSettings Settings;
/// <summary>
/// The buffer manager for allocation a buffer block to a SocketAsyncEventArgs.
/// </summary>
BufferManager BufferManager;
/// <summary>
/// The semaphore used for controlling the max connections to the server.
/// </summary>
SemaphoreSlim MaxConnectionsEnforcer;
/// <summary>
/// The socket used for listening for incoming connections.
/// </summary>
Socket ListenSocket;
/// <summary>
/// The pool of re-usable SocketAsyncEventArgs for accept operations.
/// </summary>
SocketAsyncEventArgsPool PoolOfAcceptEventArgs;
/// <summary>
/// The pool of re-usable SocketAsyncEventArgs for receiving data.
/// </summary>
SocketAsyncEventArgsPool PoolOfRecEventArgs;
/// <summary>
/// The pool of re-usable SocketAsyncEventArgs for sending data.
/// </summary>
SocketAsyncEventArgsPool PoolOfSendEventArgs;
/// <summary>
/// Initializes a new instance of the Non-blocking I/O ServerSocket.
/// </summary>
/// <param name="settings">The settings to use with this ServerSocket.</param>
public ServerSocket(ServerSocketSettings settings)
{
this.Settings = settings;
this.BufferManager = new BufferManager((this.Settings.BufferSize * this.Settings.NumOfSaeaForRec) + (this.Settings.BufferSize * this.Settings.NumOfSaeaForSend) * this.Settings.OpsToPreAllocate,
this.Settings.BufferSize * this.Settings.OpsToPreAllocate);
this.PoolOfAcceptEventArgs = new SocketAsyncEventArgsPool(this.Settings.MaxSimultaneousAcceptOps);
this.PoolOfRecEventArgs = new SocketAsyncEventArgsPool(this.Settings.NumOfSaeaForRec);
this.PoolOfSendEventArgs = new SocketAsyncEventArgsPool(this.Settings.NumOfSaeaForSend);
this.MaxConnectionsEnforcer = new SemaphoreSlim(this.Settings.MaxConnections, this.Settings.MaxConnections);
}
internal void Init()
{
this.BufferManager.InitBuffer();
for (int i = 0; i < this.Settings.MaxSimultaneousAcceptOps; i++)
{
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed +=
new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
this.PoolOfAcceptEventArgs.Push(acceptEventArg);
}
// receive objs
for (int i = 0; i < this.Settings.NumOfSaeaForRec; i++)
{
SocketAsyncEventArgs eventArgObjectForPool = new SocketAsyncEventArgs();
this.BufferManager.SetBuffer(eventArgObjectForPool);
eventArgObjectForPool.Completed +=
new EventHandler<SocketAsyncEventArgs>(IO_ReceiveCompleted);
eventArgObjectForPool.UserToken = new Connection(null, this);
this.PoolOfRecEventArgs.Push(eventArgObjectForPool);
}
// send objs
for (int i = 0; i < this.Settings.NumOfSaeaForSend; i++)
{
SocketAsyncEventArgs eventArgObjectForPool = new SocketAsyncEventArgs();
this.BufferManager.SetBuffer(eventArgObjectForPool);
eventArgObjectForPool.Completed +=
new EventHandler<SocketAsyncEventArgs>(IO_SendCompleted);
eventArgObjectForPool.UserToken = new SendDataToken();
this.PoolOfSendEventArgs.Push(eventArgObjectForPool);
}
}
public void StartListen()
{
this.ListenSocket = new Socket(this.Settings.Endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
this.ListenSocket.Bind(this.Settings.Endpoint);
this.ListenSocket.Listen(this.Settings.Backlog);
}
internal void StartAccept()
{
SocketAsyncEventArgs acceptEventArgs;
if (this.PoolOfAcceptEventArgs.TryPop(out acceptEventArgs))
{
this.MaxConnectionsEnforcer.Wait();
bool willRaiseEvent = this.ListenSocket.AcceptAsync(acceptEventArgs);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArgs);
}
}
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
{
if (acceptEventArgs.SocketError != SocketError.Success)
{
HandleBadAccept(acceptEventArgs);
return;
}
StartAccept();
SocketAsyncEventArgs recEventArgs;
if (this.PoolOfRecEventArgs.TryPop(out recEventArgs))
{
((Connection)recEventArgs.UserToken).Socket = acceptEventArgs.AcceptSocket;
acceptEventArgs.AcceptSocket = null;
this.PoolOfAcceptEventArgs.Push(acceptEventArgs);
StartReceive(recEventArgs);
}
else
{
HandleBadAccept(acceptEventArgs);
throw new InvalidOperationException("We starved the receive pool for objects, make sure it matches the max connections.");
}
}
private void IO_SendCompleted(object sender, SocketAsyncEventArgs e)
{
ProcessSend(e);
}
private void IO_ReceiveCompleted(object sender, SocketAsyncEventArgs e)
{
ProcessReceive(e);
}
private void StartReceive(SocketAsyncEventArgs receiveEventArgs)
{
Connection token = (Connection)receiveEventArgs.UserToken;
bool willRaiseEvent = token.Socket.ReceiveAsync(receiveEventArgs);
if (!willRaiseEvent)
{
ProcessReceive(receiveEventArgs);
}
}
private void ProcessReceive(SocketAsyncEventArgs receiveEventArgs)
{
Connection con = (Connection)receiveEventArgs.UserToken;
if (receiveEventArgs.BytesTransferred > 0 && receiveEventArgs.SocketError == SocketError.Success)
{
// NEED TO ADD RECEIVE DATA HERE ETC
StartReceive(receiveEventArgs);
}
else
{
CloseClientSocket(receiveEventArgs);
ReturnReceiveSaea(receiveEventArgs);
}
}
public void SendData(Socket socket, byte[] data)
{
SocketAsyncEventArgs sendEventArgs;
this.PoolOfSendEventArgs.TryPop(out sendEventArgs);
SendDataToken token = (SendDataToken)sendEventArgs.UserToken;
token.DataToSend = data;
sendEventArgs.AcceptSocket = socket;
StartSend(sendEventArgs);
}
private void StartSend(SocketAsyncEventArgs sendEventArgs)
{
SendDataToken token = (SendDataToken)sendEventArgs.UserToken;
if (token.SendBytesRemainingCount <= this.Settings.BufferSize)
{
sendEventArgs.SetBuffer(sendEventArgs.Offset, token.SendBytesRemainingCount);
Buffer.BlockCopy(token.DataToSend, token.BytesSentAlreadyCount, sendEventArgs.Buffer, sendEventArgs.Offset, token.SendBytesRemainingCount);
}
else
{
sendEventArgs.SetBuffer(sendEventArgs.Offset, this.Settings.BufferSize);
Buffer.BlockCopy(token.DataToSend, token.BytesSentAlreadyCount, sendEventArgs.Buffer, sendEventArgs.Offset, this.Settings.BufferSize);
}
bool willRaiseEvent = sendEventArgs.AcceptSocket.SendAsync(sendEventArgs);
if (!willRaiseEvent)
{
ProcessSend(sendEventArgs);
}
}
private void ProcessSend(SocketAsyncEventArgs sendEventArgs)
{
SendDataToken token = (SendDataToken)sendEventArgs.UserToken;
if (sendEventArgs.SocketError == SocketError.Success)
{
token.SendBytesRemainingCount = token.SendBytesRemainingCount - sendEventArgs.BytesTransferred;
if (token.SendBytesRemainingCount == 0)
{
token.Reset();
this.PoolOfSendEventArgs.Push(sendEventArgs);
}
else
{
token.BytesSentAlreadyCount += sendEventArgs.BytesTransferred;
StartSend(sendEventArgs);
}
}
else
{
token.Reset();
CloseClientSocket(sendEventArgs);
ReturnSendSaea(sendEventArgs);
}
}
private void CloseClientSocket(SocketAsyncEventArgs args)
{
Connection con = (Connection)args.UserToken;
try
{
con.Socket.Shutdown(SocketShutdown.Both);
}
catch (Exception) { }
con.Socket.Close();
con.OnConnectionClose();
}
private void ReturnReceiveSaea(SocketAsyncEventArgs args)
{
this.PoolOfRecEventArgs.Push(args);
this.MaxConnectionsEnforcer.Release();
}
private void ReturnSendSaea(SocketAsyncEventArgs args)
{
this.PoolOfSendEventArgs.Push(args);
}
private void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
{
acceptEventArgs.AcceptSocket.Close();
this.PoolOfAcceptEventArgs.Push(acceptEventArgs);
}
internal void Shutdown()
{
this.ListenSocket.Shutdown(SocketShutdown.Receive);
this.ListenSocket.Close();
DisposeAllSaeaObjects();
}
private void DisposeAllSaeaObjects()
{
this.PoolOfAcceptEventArgs.Dispose();
this.PoolOfSendEventArgs.Dispose();
this.PoolOfRecEventArgs.Dispose();
}让一个接收操作循环和接收数据,但是有一个多个发送操作池,可以吗?这背后的原因是因为我不想在接收之后直接发送数据。我想在我想要的时候发送数据,但是对于接收操作是不同的,我希望它循环不断地接受客户端的数据。这样会快速有效吗?
现在我需要处理的唯一事情是用这个代码正确地接收数据并解码数据。
发布于 2014-03-27 15:03:41
遗憾的是,仅仅是一些观察结果,与您的主要关注点没有直接关系,但是从代码审查的角度来看,但愿能去仍然是有价值的输入:
new指令,所以如果可能的话,我会尝试将这些指令移出类之外。Init()有太多的责任,我会从该代码中提取3个方法(每个循环一个)。SocketAsyncEventArgs.UserToken是object,为了方便起见。似乎您有时会在其中放置一个Connection,而其他时候则是一个SendDataToken。随着代码的维护和发展,这可能会引起混淆,并导致InvalidCastException的出现。您可以创建一个完全公开所需内容的类,并且只将UserToken强制转换为该类型:
public class CustomUserToken
{
public class CustomUserToken(Connection connection, SendDataToken token)
{
_connection = connection;
_token = token;
}
private readonly Connection _connection;
public Connection Connection { get { return _connection; } }
private readonly SendDataToken _token;
public SendDataToken Token { get { return _token; } }
}现在,您可以始终将SockedAsyncEventArgs.UserToken转换为CustomUserToken,并从那里获取Connection和Token。
发布于 2018-06-08 20:58:32
我注意到用于SendAsync的套接字是SAEA的AcceptSocket。然而,对于ReceiveAsync,您可以使用UserToken的套接字。您能同时使用相同的实例吗?
https://codereview.stackexchange.com/questions/8361
复制相似问题