我在Java中有一个简单的Kafka用户。如果没有卡夫卡经纪人的话,我想抓住一个例外。我需要它来打断线程。
我有这样的代码:
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties());
kafkaConsumer.subscribe(Arrays.asList(topic));
try {
ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
// records handling
} catch(Exception e) {
System.out.println(e.getMessage());
}finally {
kafkaConsumer.close();
}如果Kafka服务器关机,我不会捕获任何异常,但是日志中显示了以下消息:
18/03/28 13:33:39 WARN clients.NetworkClient: [Consumer clientId=consumer-3, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.
18/03/28 13:33:40 WARN clients.NetworkClient: [Consumer clientId=consumer-1, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.有办法在我的线上处理它吗?
发布于 2018-03-29 10:45:24
我已经解决了这个问题,有点笨拙,但它对我有用。
public boolean testSocket(String serversList) {
String[] sockets = serversList.split(",");
int unactive = 0;
for (int i = 0; i < sockets.length; i++) {
try {
String[] socket = sockets[i].split(":");
(new Socket(socket[0], Integer.valueOf(socket[1]))).close();
} catch (IOException e) {
unactive++;
}
}
if (unactive < sockets.length) return true;
return false;
}发布于 2018-03-28 11:19:29
像这样的东西可以起作用:
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup(); 1
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});您可以执行consumer.wakeup()来中断当前使用者的操作。
mainThread.join()被放入其中,以确保主线程在唤醒后不会在处理过程中被关闭。请记住,shutdownHook也负责处理中断,而不仅仅是普通的程序关闭。
https://stackoverflow.com/questions/49532645
复制相似问题