我有个问题我设法解决了..。然而,我有点担心,因为我真的不明白为什么解决办法有效;
我使用的是NetMQ,特别是具有许多套接字的NetMQ poller,其中一个是REP对。
我有一个请求队列,这些请求被排入请求中,服务器根据需要处理每种请求类型,并返回一个适当的响应。这是没有问题的,然而,当我试图添加一个额外的请求类型时,系统将按预期停止工作;会发生的情况是,请求将到达服务器,服务器将发送响应…客户也不会收到。在服务器关闭之前,客户端将不会接收消息(异常行为!)。
在发送请求之前,我使用设置的标志管理request对,并在收到回复时重置。我只能在replies的"SendReady“事件中触发回复来解决这个问题--这自动地修复了我所有的问题,但是我在文档中找不到任何东西来告诉我为什么套接字可能没有处于”send战备状态“,或者它实际上做了什么。
如果能提供任何信息,说明为什么这样做现在才能奏效,那就太好了:)
干杯。
编辑:来源
客户端:
“订阅”作为UI的单独线程运行。
private void Subscribe(string address)
{
using (var req = new RequestSocket(address + ":5555"))
using (var sub = new SubscriberSocket(address + ":5556"))
using (var poller = new NetMQPoller { req, sub })
{
// Send program code when a request for a code update is received
sub.ReceiveReady += (s, a) =>
{
var type = sub.ReceiveFrameString();
var reply = sub.ReceiveFrameString();
switch (type)
{
case "Type1":
manager.ChangeValue(reply);
break;
case "Type2":
string[] args = reply.Split(',');
eventAggregator.PublishOnUIThread(new MyEvent(args[0], (SimObjectActionEventType)Enum.Parse(typeof(MyEventType), args[1])));
break;
}
};
req.ReceiveReady += Req_ReceiveReady;
poller.RunAsync();
sub.Connect(address + ":5556");
sub.SubscribeToAnyTopic();
sub.Options.ReceiveHighWatermark = 10;
reqQueue = new Queue<string[]>();
reqQueue.Enqueue(new string[] { "InitialiseClient", "" });
req_sending = false;
while (programRunning)
{
if (reqQueue.Count > 0 && !req_sending)
{
req_sending = true;
string[] request = reqQueue.Dequeue();
Console.WriteLine("Sending " + request[0] + " " + request[1]);
req.SendMoreFrame(request[0]).SendFrame(request[1]);
}
Thread.Sleep(1);
}
}
}
private void Req_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
var req = e.Socket;
var messageType = req.ReceiveFrameString();
Console.WriteLine("Received {0}", messageType);
switch (messageType)
{
case "Reply1":
// Receive action
break;
case "Reply2":
// Receive action
break;
case "Reply3":
// Receive action
break;
}
req_sending = false;
}服务器:
using (var rep = new ResponseSocket("@tcp://*:5555"))
using (var pub = new PublisherSocket("@tcp://*:5556"))
using (var beacon = new NetMQBeacon())
using (var poller = new NetMQPoller { rep, pub, beacon })
{
// Send program code when a request for a code update is received
rep.ReceiveReady += (s, a) =>
{
var messageType = rep.ReceiveFrameString();
var message = rep.ReceiveFrameString();
Console.WriteLine("Received {0} - Content: {1}", messageType, message);
switch (messageType)
{
case "InitialiseClient":
// Send
rep.SendMoreFrame("Reply1").SendFrame(repData);
break;
case "Req2":
// do something
rep.SendMoreFrame("Reply2").SendFrame("RequestOK");
break;
case "Req3":
args = message.Split(',');
if (args.Length == 2)
{
// Do Something
rep.SendMoreFrame("Reply3").SendFrame("RequestOK");
}
else
{
rep.SendMoreFrame("Ack").SendFrame("RequestError - incorrect argument format");
}
break;
case "Req4":
args = message.Split(',');
if (args.Length == 2)
{
requestData = //do something
rep.SendMoreFrame("Reply4").SendFrame(requestData);
}
else
{
rep.SendMoreFrame("Ack").SendFrame("RequestError - incorrect argument format");
}
break;
default:
rep.SendMoreFrame("Ack").SendFrame("Error");
break;
}
};
// setup discovery beacon with 1 second interval
beacon.Configure(5555);
beacon.Publish("server", TimeSpan.FromSeconds(1));
// start the poller
poller.RunAsync();
// run the simulation loop
while (serverRunning)
{
// todo - make this operate for efficiently
// push updated variable values to clients
foreach (string[] message in pubQueue)
{
pub.SendMoreFrame(message[0]).SendFrame(message[1]);
}
pubQueue.Clear();
Thread.Sleep(2);
}
poller.StopAsync();
}发布于 2016-06-17 14:57:21
您正在使用来自多个线程的请求套接字,这是不支持的。您正在发送主线程和接收轮询线程。
您可以通过UI线程将其添加到轮询器和队列中,而不是使用常规队列尝试使用NetMQQueue。然后在轮询线程和接收端进行发送。
您可以在这里阅读文档:http://netmq.readthedocs.io/en/latest/queue/
发布于 2016-06-14 07:48:01
我唯一能想到的是,REP套接字只在您完全接收到消息(所有部分)之后才准备发送。
https://stackoverflow.com/questions/37804936
复制相似问题