我使用的是spring cloud数据流。
@Bean
@InboundChannelAdapter(channel = TbeSource.PR1, poller = @Poller(fixedDelay = "2"))
public Supplier<Product> getProductSource(ProductBuilder dataAccess) {
return ()->dataAccess.getNext();
};如果kafka突然停机,那么我们如何停止这种轮询行为以防止数据丢失?
当我测试时,即使kafka宕机了,数据也会不断地从数据库中读取,并且不断地试图将记录发送到kafka?
预期的性能是在kafka宕机后停止数据轮询。
有没有可能实现这一点的方法?
发布于 2020-12-21 22:35:20
@InboundChannelAdapter的@Poller可以配置errorChannel
/**
* @return The the bean name of default error channel
* for the underlying {@code MessagePublishingErrorHandler}.
* @since 4.3.3
*/
String errorChannel() default "";因此,无论何时在TbeSource.PR1通道上流的下游发生异常,它都将被传送到提供的错误通道,以便在其上产生一些错误流。
在那里,您可以按照逻辑停止为该@InboundChannelAdapter和Supplier组合创建的SourcePollingChannelAdapter。在本例中,bean id类似于:[CONFIGURATION_CLASS_BEAN_NAME.getProductSource.inboundChannelAdapter]。更多信息请看这里:https://docs.spring.io/spring-integration/reference/html/configuration.html#annotations_on_beans。正如它所说的,您也可以使用@EndpointId来简化依赖注入例程的工作。
确保重新抛出异常,让DB事务回滚,以避免数据丢失!
https://stackoverflow.com/questions/65392068
复制相似问题