我有两个路线,打算过滤来自卡夫卡的用户信息,基于存储在咖啡因缓存中的用户in黑名单。
在启动时加载包含黑名单用户is的txt文件的第一个路由定义如下:
from(file(...))
.id(getRouteId())
.split().tokenize("\n")
.stopOnException()
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionTimeout(500)
.setHeader(CaffeineConstants.ACTION, constant(CaffeineConstants.ACTION_PUT))
.setHeader(CaffeineConstants.KEY, constant("blacklistedIds"))
.toF("caffeine-cache://%s", "blacklistedIds")注意:黑名单由一个List<String> blacklistedIds组成,它存储在咖啡因缓存中(在应用程序启动时初始化)。
从kafka (User properties : id,firstname,lastname)获取用户消息的第二条路径定义如下:
from(kafka(...))
.id(getRouteId())
.unmarshal().json(JsonLibrary.Jackson, UserMessage.class)
// filter here (based on cache) to only let go authorized users
//.filter(method(UserMessageFilterService.class, "isAuthorizedUser"))
.to(output())我的问题是如何使用存储在咖啡因缓存中的blacklistIds来过滤传入的消息?如何在UserMessageFilterService#isAuthorizedUser bean的方法中获得缓存?是否有更好/简单的方法来实现这一点?
发布于 2022-06-20 15:19:40
可以将消息正文和任意标头传递给isAuthorizedUser()。
from(kafka(...))
...
// save the UserMessage in a header
.setHeader("userMessage", body())
// retrieve the cached list of blacklisted Ids
.setHeader(CaffeineConstants.CamelCaffeineAction, CaffeineConstants.GET)
.setHeader(CaffeineConstants.CamelCaffeineKey, constant("blacklistedIds"))
.to("caffeine-cache:blacklistedIds)
.filter(method(UserMessageFilterService.class, "isAuthorizedUser"))
// restore the original body
.setBody(header("userMessage"))UserMessageFilterService.java
public boolean isAuthorizedUser(List<String> blacklistedIds, @Header("userMessage") UserMessage userMessage) {
return !blacklistedIds.contains(userMessage.getUserId());
}https://stackoverflow.com/questions/72677929
复制相似问题