首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Quarkus + Kafka + Smallrye异常处理

Quarkus + Kafka + Smallrye异常处理
EN

Stack Overflow用户
提问于 2020-01-31 22:20:55
回答 1查看 1.6K关注 0票数 4

如何使用quarkus + kafka + smallrye处理流处理异常?

我的代码与quarkus guide (https://quarkus.io/guides/kafka#imperative-usage)上的命令式生成器示例非常相似

代码语言:javascript
复制
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(price);
    }
}

我想要一些类似于vanilla Kafka库的东西,它提供了处理请求发送的每个记录的回调的选项。

代码语言:javascript
复制
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        logger.info(record.toString());

        if (exception != null) {
            logger.error("Producer exception", exception);
        }
    }
});

Tks

EN

回答 1

Stack Overflow用户

发布于 2020-02-01 01:55:26

有一个section of the docs on Acknowlegement

代码语言:javascript
复制
@Incoming("i")
@Outgoing("j")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
  public CompletionStage<Message<String>> manualAck(Message<String> input) {
    return CompletableFuture.supplyAsync(input::getPayload)
      .thenApply(Message::of)
      .thenCompose(m -> input.ack().thenApply(x -> m));
  }
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60005697

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档