首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >对套接字的异步方法BeginSend的队列调用

对套接字的异步方法BeginSend的队列调用
EN

Stack Overflow用户
提问于 2014-05-04 12:02:27
回答 1查看 3.1K关注 0票数 2

我需要将BeginSend调用排队到套接字,并且需要按时间顺序执行它们。为了做到这一点,我使用了一个信号量,在回调函数清晰的时候发出信号。

大多数情况下,它可以工作,因为每个异步回调都是在一个分离的线程上执行的,但有时在新的异步调用中使用当前回调中使用的相同线程。当发生这种情况时,该线程将被锁定,等待信号量被释放,但是由于应该清除信号量的同一个线程正在等待它被清除,所以线程将被永久锁定。

为了说明这个问题,这里有一个测试代码:

代码语言:javascript
复制
static Semaphore semaphore = new Semaphore(1, 1);
static IList<byte[]> buffer = new List<byte[]>();
static void Main()
{
    Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    socket.Connect(new IPEndPoint(new IPAddress(new byte[] { 192, 168, 1, 8 }), 123));
    while (true) // data feed
    {
        lock (buffer)
        {
            buffer.Add(new byte[1460]);
            if (buffer.Count == 1)
                socket.BeginSend(buffer[0], 0, 1460, 0, new AsyncCallback(SendCallback), socket); // calls BeginSend if the buffer was empty before 
        }
    }
}

static void SendCallback(IAsyncResult ar)
{
    Console.WriteLine("in " + Thread.CurrentThread.ManagedThreadId);
    semaphore.WaitOne();
    Socket socket = (Socket)ar.AsyncState;
    lock(buffer)
    {
        buffer.RemoveAt(0); // removes data that was sent
        if (buffer.Count > 0) // if there is more data to send calls BeginSend again
            socket.BeginSend(buffer[0], 0, 1460, 0, new AsyncCallback(SendCallback), socket);
    }
    semaphore.Release();
    Console.WriteLine("out " + Thread.CurrentThread.ManagedThreadId);
}

这是输出:

因为线程10被传输到一个新的回调,而没有给以前的回调退出和清除信号量的机会,所以线程将被永久锁定。

我应该如何处理这个问题?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-05-04 12:50:54

切换到任务:

尼斯msdn文章任务和APM模式

代码语言:javascript
复制
public Task<int> SendAsync(Socket socket, byte[] buffer, int offset, int size, SocketFlags flags)
{
   var result = socket.BeginSend(buffer, offset, size, flags, _ => { }, socket);
   return Task.Factory.FromAsync(result,(r) => socket.EndSend(r));
}

现在事情变得更容易了:

使用默认的BlockingCollection<>作为并发队列。它的线程保存并删除列表上的显式锁

代码语言:javascript
复制
static BlockingCollection<byte[]> buffer = new BlockingCollection<byte[]>();

public async void Main()
{
   Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
   socket.Connect(new IPEndPoint(new IPAddress(new byte[] { 192, 168, 1, 8 }), 123));
   while (!buffer.IsCompleted)
   {
      var data = buffer.Take();
      await SendAsync(socket, data, 0, data.Length, 0);
   }
   Console.ReadLine();            
}

在不需要信号量的情况下,保持非阻塞发送和命令。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23455992

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档