我正在使用Flowable.generate构建一个Kafka consumer Flowable,下面是我的实现
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class RxKafkaUtils {
/**
* Returns a flowable impl for a Kafka Consumer
*/
public static @NonNull Flowable < Object > source(Properties props, String[] topics) {
final
var consumer = new KafkaConsumer(props);
return Flowable.generate(
() -> {
consumer.subscribe(Arrays.asList(topics));
return ConsumerRecords.empty().iterator();
},
(state, emitter) -> {
if (state.hasNext()) {
emitter.onNext(state.next());
return state;
} else {
try {
return consumer.poll(1000).iterator();
} catch (Throwable t) {
consumer.close();
emitter.onError(t);
throw t; // <------ HERE
}
}
});
}
}因为KafkaConsumer可以抛出异常,所以我将.poll封装在一个try-catch中。
我的疑问是,我应该重新抛出Throwable吗?如果没有,编译器将从catch块中请求一个返回值,在本例中,我实际上没有任何返回值
实现这一点的正确方法是什么(换句话说-优雅关闭)?
对于这一点,Flowable.generate甚至是正确的选择吗?我尝试使用Flowable.fromPublisher,但是状态管理非常混乱,或者我做错了
发布于 2021-06-06 22:35:58
好吧,事实证明,
Flowable.generate接受在onComplete/onError之后调用的disposeState消费者。当前状态(或者我返回的最终状态被传递给disposeState使用者)
https://stackoverflow.com/questions/67860131
复制相似问题