在下面的代码中,我使用netmq推/拉套接字发送和接收100000条消息。我首先尝试在我的拉套接字上使用ReceiveFrameString (ReceiveSimple方法)执行一个简单的阻塞调用,然后尝试使用轮询器进行同样的操作(ReceiveWithPoller方法)。
使用轮询器会对发送/接收消息所需的时间产生重大影响。我试图弄清楚为什么自己使用dotTrace,我发现很多时间都在等待Socket.Select的执行。
有人能确认或解释这种差异吗?
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace PushPull
{
class Program
{
static int messageReceived = 0;
const int sampleCount = 100 * 1000;
static void Main(string[] args)
{
// Create psu socket
PushSocket pushSocket = new PushSocket();
pushSocket.Bind("tcp://localhost:5555");
// Create pull socket
PullSocket pullSocket = new PullSocket();
pullSocket.Connect("tcp://localhost:5555");
Console.WriteLine("Ready...press any key to start");
Console.ReadKey();
Console.WriteLine();
// Start sending
Task.Run(() =>
{
for (int i = 0; i < sampleCount; i++)
{
pushSocket.SendFrame(Encoding.UTF8.GetBytes("ping"));
}
});
Stopwatch sw = Stopwatch.StartNew();
//ReceiveSimple(pullSocket);
ReceiveWithPoller(pullSocket);
// Display result.
sw.Stop();
Console.WriteLine();
Console.WriteLine("{0} message exchanged in {1} msecs", sampleCount, sw.Elapsed.TotalMilliseconds);
}
private static void ReceiveSimple(PullSocket pullSocket)
{
messageReceived = 0;
do
{
pullSocket.ReceiveFrameString();
} while (!HandleMessage());
}
private static void ReceiveWithPoller(PullSocket pullSocket)
{
NetMQPoller poller = new NetMQPoller();
poller.Add(pullSocket);
pullSocket.ReceiveReady += (sender, eventArgs) =>
{
if (HandleMessage())
{
poller.Stop();
}
};
poller.Run();
}
private static bool HandleMessage()
{
messageReceived++;
if (messageReceived % 10000 == 0)
{
Console.WriteLine("10k");
}
return messageReceived == sampleCount;
}
}
}发布于 2016-05-05 14:07:11
当您处理大量消息时,Poller是很昂贵的,但是有一个非常简单的解决方案。
当收到就绪事件时,可以使用Try*方法获取队列中的所有消息,在示例中如下所示:
pullSocket.ReceiveReady += (sender, eventArgs) =>
{
string message;
while (pullSocket.TryReceiveFrameString(out message)
{
if (HandleMessage())
{
poller.Stop();
}
}
};https://stackoverflow.com/questions/37051913
复制相似问题