我已经暴露了一个卡夫卡节点和一个主题-名称。我的web服务器接收了大量的http请求数据,我需要这些数据来处理它们,然后将它们推送给kafka。有时,如果kafka节点被关闭,那么我的服务器仍然会继续抽取数据,这会导致我的内存被炸掉,我的服务器也会崩溃。
如果卡夫卡坏了,我要停止发布数据。我的Java示例代码如下:
static Producer producer;
Produce() {
Properties properties = new Properties();
properties.put("request.required.acks","1");
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("enabled","true");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("kafka-topic","pixel-server");
properties.put("batch.size","1000");
properties.put("producer.type","async");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(properties);
}
public static void main(String[] args) {
Produce produce = new Produce();
produce.send(producer, "pixel-server", "Some time");
}
//This method is called lot of times
public void send(Producer<String, String> producer, String topic, String data) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data);
Future<RecordMetadata> response = producer.send(producerRecord, (metadata, exception) -> {
if (null != exception) {
exception.printStackTrace();
} else {
System.out.println("Done");
}
});我刚刚抽象出了一些示例代码。发送方法被多次调用。如果卡夫卡倒下了,我只想防止发送任何信息。解决这种情况的有效方法是什么。
https://stackoverflow.com/questions/49496934
复制相似问题