我有下面的“hello world”jeromq推拉客户端和服务器。只有在我设置高水位线值之后,我才能在不丢失消息的情况下进行传输。
import org.jeromq.ZMQ;
public class TestTcpServer {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PULL);
System.out.println("Binding TCP server on port 5555");
//socket.setRcvHWM(100_000);
socket.bind("tcp://*:5555");
int x;
x = 0;
while (true) {
x++;
byte[] raw = socket.recv(0);
String rawMessage = new String(raw);
if (x > 99_997) {
System.out.println(x);
System.out.println(rawMessage);
}
}
}
}
//client
import java.io.IOException;
import org.jeromq.ZMQ;
public class TestTcpClient {
/**
* @param args
* @throws InterruptedException
* @throws IOException
*/
public static void main(String[] args) throws InterruptedException,
IOException {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PUSH);
socket.connect("tcp://localhost:5555");
//socket.setRcvHWM(100_000);
System.out.println("Sending 100 000 transactions over TCP..."); long start = System.currentTimeMillis();
for (int request_nbr = 0; request_nbr != 100_000; request_nbr++) {
String requestString = "message";
byte[] request = requestString.getBytes();
boolean success = socket.send(request, 0);
if (!success) {
System.out.println("sending message failed!");
}
}
long end = System.currentTimeMillis();
System.out.print("Time: ");
System.out.print(end - start);
System.out.println(" ms");
socket.close();
context.term();
}
}根据0Mq documentation,只有PUB套接字在达到高水位线时才会丢弃消息。由于我使用的是推拉套接字,为什么消息会被丢弃?
对我来说,HWM似乎是系统的一个动态属性,所以当我能够在hello world示例中解决丢弃消息的问题时,我想知道在现实世界的情况下,我是否可以依靠jeromq不丢弃消息?
发布于 2012-11-11 22:01:13
感谢您的报道。
可以,PUSH/PULL不能丢弃消息。
在最新的支持ZMTP 2.0的leap上有一个bug。
请尝试使用最新的快照。
另外,我建议您在发送消息后想要退出应用程序时,在客户端添加socket.setLinger(some_milli_seconds)。
发布于 2013-06-14 14:55:39
在JeroMQ 0.3中应该可以正常工作,我在这里做了一个简单的基准测试http://nguyentantrieu.info/blog/benchmark-pubsub-jeromq-java/
https://stackoverflow.com/questions/13221776
复制相似问题