首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Flowable.generate RxJava处理错误

使用Flowable.generate RxJava处理错误
EN

Stack Overflow用户
提问于 2021-06-06 22:24:08
回答 1查看 58关注 0票数 0

我正在使用Flowable.generate构建一个Kafka consumer Flowable,下面是我的实现

代码语言:javascript
复制
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class RxKafkaUtils {

  /**
   * Returns a flowable impl for a Kafka Consumer
   */
  public static @NonNull Flowable < Object > source(Properties props, String[] topics) {
    final
    var consumer = new KafkaConsumer(props);
    return Flowable.generate(
      () -> {
        consumer.subscribe(Arrays.asList(topics));
        return ConsumerRecords.empty().iterator();
      },
      (state, emitter) -> {
        if (state.hasNext()) {
          emitter.onNext(state.next());
          return state;
        } else {
          try {
            return consumer.poll(1000).iterator();
          } catch (Throwable t) {
            consumer.close();
            emitter.onError(t);
            throw t;           // <------ HERE
          }
        }
      });
  }
}

因为KafkaConsumer可以抛出异常,所以我将.poll封装在一个try-catch中。

我的疑问是,我应该重新抛出Throwable吗?如果没有,编译器将从catch块中请求一个返回值,在本例中,我实际上没有任何返回值

实现这一点的正确方法是什么(换句话说-优雅关闭)?

对于这一点,Flowable.generate甚至是正确的选择吗?我尝试使用Flowable.fromPublisher,但是状态管理非常混乱,或者我做错了

EN

回答 1

Stack Overflow用户

发布于 2021-06-06 22:35:58

好吧,事实证明,

Flowable.generate接受在onComplete/onError之后调用的disposeState消费者。当前状态(或者我返回的最终状态被传递给disposeState使用者)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67860131

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档