首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将Kafka与应用程序解耦

将Kafka与应用程序解耦
EN

Stack Overflow用户
提问于 2018-09-20 00:52:13
回答 1查看 182关注 0票数 0

我有一个应用程序,它接收大量的GET请求(5分钟内大约有250000个)。应用程序解析查询参数并发布到Kafka。要发布的代码如下:

代码语言:javascript
复制
public class KafkaProcessor {

  private static final String BATCH_SIZE = "batch.size";
  private static final String REQUEST_REQUIRED_ACKS = "request.required.acks";
  private static final String PRODUCER_TYPE = "producer.type";
  private static final String VALUE_SERIALIZER = "value.serializer";
  private static final String KEY_SERIALIZER = "key.serializer";
  private static final String METADATA_BROKER_LIST = "bootstrap.servers";
  private static final String MAX_BLOCK_MS = "max.block.ms";
  private static final String KAFKA_ENABLED = "enabled";

  private static Properties props = new Properties();
  private static KafkaProducer<String, String> producer;
  private static ProducerRecord<String, String> producerRecord;
  private static String topic;


  static {
    boolean isEnabled = Boolean.parseBoolean(ResourceProps.INSTANCE.getKafkaProps(KAFKA_ENABLED));
    if (isEnabled) {
      //Setting up a producer configuration.
      props.put(METADATA_BROKER_LIST, "x.x.x.x:9092,y.y.y.y:9092");
      props.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
      props.put(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
      props.put(PRODUCER_TYPE, "async");
      props.put(REQUEST_REQUIRED_ACKS, "1");
      props.put(BATCH_SIZE, "1000");
      props.put(MAX_BLOCK_MS, "10000");
      producer = new KafkaProducer<>(props);
      topic = "pixel-server";
    }
  }


  private static void publishToKafka(JSONObject data) {
      producerRecord = new ProducerRecord<String, String>(topic, data.toString());
      producer.send(producerRecord, new Callback() {
        @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
          if (exception != null) {
            exception.printStackTrace();
          }
        }
      });
  }
}

我的应用程序托管在AWS实例中。Kafka服务器也在另一台AWS机器上。

但是,如果kafka宕机了,或者如果kafka由于任何原因而响应缓慢,那么我的应用程序就会冻结,无法进一步处理任何请求。我想知道如何使我的应用程序独立于Kafka,也就是说,如果kafka宕机(或响应缓慢),那么它不应该影响我的应用程序。

我尝试了几种方法,比如如果kafka给出一个超时,然后计算超时异常的数量,并停止发布到kafka,但由于请求量非常大,所以当我收到任何超时异常时,我的应用程序就会冻结。

任何帮助或指针都将不胜感激。

我使用的是Kafka 0.8.2。我的服务器在Vertx。Ubuntu中使用的操作系统。ulimit设置为max。

EN

回答 1

Stack Overflow用户

发布于 2018-09-20 17:52:10

假设您的Kafka集群中有三个或更多节点(这对于任何高负载应用程序都至关重要),您可以尝试一些技巧:

  1. 尝试将acks生产者配置设置为0。这将影响应用程序的一致性(有些消息可能会在生产者端被丢弃,并将永远丢失)。文档中写道:

如果设置为零,那么生产者根本不会等待来自服务器的任何确认。该记录将被立即添加到套接字缓冲区,并被视为已发送。不能保证服务器已收到记录,在本例中为

  1. max.block.ms生产者配置设置为0。这将导致您的应用程序在每次发送到Kafka集群时立即抛出TimeoutException,没有任何阻塞,但仅在内存缓冲区溢出时抛出。请注意,它只影响客户端阻塞,而不影响网络调用!
  2. request.timeout.ms减少到较小的值(如10100)。这将导致Kafka客户端在任何比request.timeout.ms耗时更长的网络操作上抛出TimeoutException。

还有更多的建议:

  1. 将您的Kafka实例更新到最新版本以获得更稳定的集群;
  2. 要实现高可用,您的Kafka集群必须至少包含三个节点(并且始终包含奇数个节点以避免split-brain condition)
  3. you应尝试使用max.batch.sizelinger.ms生产者配置以达到应用程序的最佳延迟吞吐比
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52410769

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档