我有一个使用kafka绑定的spring-cloud-stream worker
@Slf4j
@EnableBinding(KafkaStreamsProcessor.class)
@RequiredArgsConstructor
public class SomeWorker {
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public KStream<?, Obj> process(KStream<?, Obj> objStream) {
return objStream.something();
}
}和一个全局拦截器
@Component
@Slf4j
@GlobalChannelInterceptor
public class StreamInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> msg, MessageChannel mc) {
log.info("In preSend");
return msg;
}
@Override
public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
log.info("In postSend");
}
@Override
public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
log.info("In afterSendCompletion");
}
@Override
public boolean preReceive(MessageChannel mc) {
log.info("In preReceive");
return true;
}
@Override
public Message<?> postReceive(Message<?> msg, MessageChannel mc) {
log.info("In postReceive");
return msg;
}
}在接收到流上的任何消息时,不会调用GlobalChannelInterceptor。
我错过了什么?
发布于 2019-06-01 01:08:00
Kafka Streams绑定器不是基于MessageChannel的,因此没有通道可供拦截。
https://stackoverflow.com/questions/56398499
复制相似问题