首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SocketAsyncEventArgs收发

SocketAsyncEventArgs收发
EN

Code Review用户
提问于 2012-01-27 15:01:40
回答 2查看 7.9K关注 0票数 8

我一直在努力让SocketAsyncEventArgs按照我想要的方式工作。现在我在想:这是否会起作用呢?

代码语言:javascript
复制
/// <summary>
/// The settings to use with this ServerSocket.
/// </summary>
ServerSocketSettings Settings;

/// <summary>
/// The buffer manager for allocation a buffer block to a SocketAsyncEventArgs.
/// </summary>
BufferManager BufferManager;

/// <summary>
/// The semaphore used for controlling the max connections to the server.
/// </summary>
SemaphoreSlim MaxConnectionsEnforcer;

/// <summary>
/// The socket used for listening for incoming connections.
/// </summary>
Socket ListenSocket;

/// <summary>
/// The pool of re-usable SocketAsyncEventArgs for accept operations.
/// </summary>
SocketAsyncEventArgsPool PoolOfAcceptEventArgs;

/// <summary>
/// The pool of re-usable SocketAsyncEventArgs for receiving data.
/// </summary>
SocketAsyncEventArgsPool PoolOfRecEventArgs;

/// <summary>
/// The pool of re-usable SocketAsyncEventArgs for sending data.
/// </summary>
SocketAsyncEventArgsPool PoolOfSendEventArgs;

/// <summary>
/// Initializes a new instance of the Non-blocking I/O ServerSocket.
/// </summary>
/// <param name="settings">The settings to use with this ServerSocket.</param>
public ServerSocket(ServerSocketSettings settings)
{
    this.Settings = settings;

    this.BufferManager = new BufferManager((this.Settings.BufferSize * this.Settings.NumOfSaeaForRec) + (this.Settings.BufferSize * this.Settings.NumOfSaeaForSend) * this.Settings.OpsToPreAllocate,
        this.Settings.BufferSize * this.Settings.OpsToPreAllocate);
    this.PoolOfAcceptEventArgs = new SocketAsyncEventArgsPool(this.Settings.MaxSimultaneousAcceptOps);
    this.PoolOfRecEventArgs = new SocketAsyncEventArgsPool(this.Settings.NumOfSaeaForRec);
    this.PoolOfSendEventArgs = new SocketAsyncEventArgsPool(this.Settings.NumOfSaeaForSend);

    this.MaxConnectionsEnforcer = new SemaphoreSlim(this.Settings.MaxConnections, this.Settings.MaxConnections);
}

internal void Init()
{
    this.BufferManager.InitBuffer();

    for (int i = 0; i < this.Settings.MaxSimultaneousAcceptOps; i++)
    {
        SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
        acceptEventArg.Completed +=
            new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);

        this.PoolOfAcceptEventArgs.Push(acceptEventArg);
    }

    // receive objs
    for (int i = 0; i < this.Settings.NumOfSaeaForRec; i++)
    {
        SocketAsyncEventArgs eventArgObjectForPool = new SocketAsyncEventArgs();
        this.BufferManager.SetBuffer(eventArgObjectForPool);

        eventArgObjectForPool.Completed +=
            new EventHandler<SocketAsyncEventArgs>(IO_ReceiveCompleted);
        eventArgObjectForPool.UserToken = new Connection(null, this);
        this.PoolOfRecEventArgs.Push(eventArgObjectForPool);
    }

    // send objs
    for (int i = 0; i < this.Settings.NumOfSaeaForSend; i++)
    {
        SocketAsyncEventArgs eventArgObjectForPool = new SocketAsyncEventArgs();
        this.BufferManager.SetBuffer(eventArgObjectForPool);

        eventArgObjectForPool.Completed +=
            new EventHandler<SocketAsyncEventArgs>(IO_SendCompleted);
        eventArgObjectForPool.UserToken = new SendDataToken();
        this.PoolOfSendEventArgs.Push(eventArgObjectForPool);
    }
}

public void StartListen()
{
    this.ListenSocket = new Socket(this.Settings.Endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
    this.ListenSocket.Bind(this.Settings.Endpoint);
    this.ListenSocket.Listen(this.Settings.Backlog);
}

internal void StartAccept()
{
    SocketAsyncEventArgs acceptEventArgs;

    if (this.PoolOfAcceptEventArgs.TryPop(out acceptEventArgs))
    {
        this.MaxConnectionsEnforcer.Wait();
        bool willRaiseEvent = this.ListenSocket.AcceptAsync(acceptEventArgs);

        if (!willRaiseEvent)
        {
            ProcessAccept(acceptEventArgs);
        }
    }
}

private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
    ProcessAccept(e);
}

private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
{
    if (acceptEventArgs.SocketError != SocketError.Success)
    {
        HandleBadAccept(acceptEventArgs);
        return;
    }

    StartAccept();

    SocketAsyncEventArgs recEventArgs;

    if (this.PoolOfRecEventArgs.TryPop(out recEventArgs))
    {
        ((Connection)recEventArgs.UserToken).Socket = acceptEventArgs.AcceptSocket;

        acceptEventArgs.AcceptSocket = null;
        this.PoolOfAcceptEventArgs.Push(acceptEventArgs);

        StartReceive(recEventArgs);
    }
    else
    {
        HandleBadAccept(acceptEventArgs);
        throw new InvalidOperationException("We starved the receive pool for objects, make sure it matches the max connections.");
    }
}

private void IO_SendCompleted(object sender, SocketAsyncEventArgs e)
{
    ProcessSend(e);
}

private void IO_ReceiveCompleted(object sender, SocketAsyncEventArgs e)
{
    ProcessReceive(e);
}

private void StartReceive(SocketAsyncEventArgs receiveEventArgs)
{
    Connection token = (Connection)receiveEventArgs.UserToken;

    bool willRaiseEvent = token.Socket.ReceiveAsync(receiveEventArgs);

    if (!willRaiseEvent)
    {
        ProcessReceive(receiveEventArgs);
    }
}

private void ProcessReceive(SocketAsyncEventArgs receiveEventArgs)
{
    Connection con = (Connection)receiveEventArgs.UserToken;

    if (receiveEventArgs.BytesTransferred > 0 && receiveEventArgs.SocketError == SocketError.Success)
    {

        // NEED TO ADD RECEIVE DATA HERE ETC

        StartReceive(receiveEventArgs);
    }
    else
    {
        CloseClientSocket(receiveEventArgs);
        ReturnReceiveSaea(receiveEventArgs);
    }
}

public void SendData(Socket socket, byte[] data)
{
    SocketAsyncEventArgs sendEventArgs;
    this.PoolOfSendEventArgs.TryPop(out sendEventArgs);

    SendDataToken token = (SendDataToken)sendEventArgs.UserToken;
    token.DataToSend = data;

    sendEventArgs.AcceptSocket = socket;
    StartSend(sendEventArgs);
}

private void StartSend(SocketAsyncEventArgs sendEventArgs)
{
    SendDataToken token = (SendDataToken)sendEventArgs.UserToken;

    if (token.SendBytesRemainingCount <= this.Settings.BufferSize)
    {
        sendEventArgs.SetBuffer(sendEventArgs.Offset, token.SendBytesRemainingCount);
        Buffer.BlockCopy(token.DataToSend, token.BytesSentAlreadyCount, sendEventArgs.Buffer, sendEventArgs.Offset, token.SendBytesRemainingCount);
    }
    else
    {
        sendEventArgs.SetBuffer(sendEventArgs.Offset, this.Settings.BufferSize);
        Buffer.BlockCopy(token.DataToSend, token.BytesSentAlreadyCount, sendEventArgs.Buffer, sendEventArgs.Offset, this.Settings.BufferSize);
    }

    bool willRaiseEvent = sendEventArgs.AcceptSocket.SendAsync(sendEventArgs);

    if (!willRaiseEvent)
    {
        ProcessSend(sendEventArgs);
    }
}

private void ProcessSend(SocketAsyncEventArgs sendEventArgs)
{
    SendDataToken token = (SendDataToken)sendEventArgs.UserToken;

    if (sendEventArgs.SocketError == SocketError.Success)
    {
        token.SendBytesRemainingCount = token.SendBytesRemainingCount - sendEventArgs.BytesTransferred;

        if (token.SendBytesRemainingCount == 0)
        {
            token.Reset();
            this.PoolOfSendEventArgs.Push(sendEventArgs);
        }
        else
        {
            token.BytesSentAlreadyCount += sendEventArgs.BytesTransferred;
            StartSend(sendEventArgs);
        }
    }
    else
    {
        token.Reset();
        CloseClientSocket(sendEventArgs);
        ReturnSendSaea(sendEventArgs);
    }
}

private void CloseClientSocket(SocketAsyncEventArgs args)
{
    Connection con = (Connection)args.UserToken;

    try
    {
        con.Socket.Shutdown(SocketShutdown.Both);
    }
    catch (Exception) { }

    con.Socket.Close();
    con.OnConnectionClose();
}

private void ReturnReceiveSaea(SocketAsyncEventArgs args)
{
    this.PoolOfRecEventArgs.Push(args);
    this.MaxConnectionsEnforcer.Release();
}

private void ReturnSendSaea(SocketAsyncEventArgs args)
{
    this.PoolOfSendEventArgs.Push(args);
}

private void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
{
    acceptEventArgs.AcceptSocket.Close();
    this.PoolOfAcceptEventArgs.Push(acceptEventArgs);
}

internal void Shutdown()
{
    this.ListenSocket.Shutdown(SocketShutdown.Receive);
    this.ListenSocket.Close();

    DisposeAllSaeaObjects();
}

private void DisposeAllSaeaObjects()
{
    this.PoolOfAcceptEventArgs.Dispose();
    this.PoolOfSendEventArgs.Dispose();
    this.PoolOfRecEventArgs.Dispose();
}

让一个接收操作循环和接收数据,但是有一个多个发送操作池,可以吗?这背后的原因是因为我不想在接收之后直接发送数据。我想在我想要的时候发送数据,但是对于接收操作是不同的,我希望它循环不断地接受客户端的数据。这样会快速有效吗?

现在我需要处理的唯一事情是用这个代码正确地接收数据并解码数据。

EN

回答 2

Code Review用户

发布于 2014-03-27 15:03:41

遗憾的是,仅仅是一些观察结果,与您的主要关注点没有直接关系,但是从代码审查的角度来看,但愿能去仍然是有价值的输入:

  • 该类与其他几个类紧密耦合。这可能有问题,也可能没有问题,只是想指出一下--我认为实例化对象本身就是一个问题,我不喜欢在代码中到处都是new指令,所以如果可能的话,我会尝试将这些指令移出类之外。
  • Init()有太多的责任,我会从该代码中提取3个方法(每个循环一个)。
  • SocketAsyncEventArgs.UserTokenobject,为了方便起见。似乎您有时会在其中放置一个Connection,而其他时候则是一个SendDataToken。随着代码的维护和发展,这可能会引起混淆,并导致InvalidCastException的出现。

您可以创建一个完全公开所需内容的类,并且只将UserToken强制转换为该类型:

代码语言:javascript
复制
public class CustomUserToken
{
    public class CustomUserToken(Connection connection, SendDataToken token)
    {
        _connection = connection;
        _token = token;
    }

    private readonly Connection _connection;
    public Connection Connection { get { return _connection; } }

    private readonly SendDataToken _token;
    public SendDataToken Token { get { return _token; } }
}

现在,您可以始终将SockedAsyncEventArgs.UserToken转换为CustomUserToken,并从那里获取ConnectionToken

票数 5
EN

Code Review用户

发布于 2018-06-08 20:58:32

我注意到用于SendAsync的套接字是SAEA的AcceptSocket。然而,对于ReceiveAsync,您可以使用UserToken的套接字。您能同时使用相同的实例吗?

票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/8361

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档