我想知道是否可能(默认情况下)为从特定主题接收到的特定批消息添加聚合。
描述:我有球员正在寻找一个大厅,这是自动创建。这个想法是玩家运行一个搜索游戏和消息的{ id,mod,map }被发送到主题。每个mod/map包含一个规则,允许多少玩家加入,并且基于这个数量,我试图聚集这些玩家。当大厅(游戏)的状态看起来已经准备好启动一个大厅(10/10玩家)时,我们应该将细节转换为一条消息,并将其进一步推送到处理中,同时聚合应该刷新并从0开始。
KStream<String, List<String>> gameLobbyEventStream =
streamsBuilder.stream(
"GAME_MATCHMAKING_LOBBY_EVENT-TOPIC",
Consumed.with(Serdes.String(), Serdes.serdeFrom(String.class)))
.peek((k, v) -> log.info("key:{}, value:{}", k, v))
.groupBy((k, v) -> k, Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(
ArrayList::new,
(k, v, agg) -> {
agg.add(v);
return agg;
},
Materialized.with(
Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String())))
.toStream();
gameLobbyEventStream
.filter((k, v) -> isReadyToStartLobby(v))
.peek((k, v) -> log.info("lobby is creating for {}", v))
// convert to lobby { id, players, details }
.to("LAUNCH_MATCHMAKING_LOBBY_EVENT-TOPIC");#真实情况:模式:死亡比赛(3x3:6允许玩家)
-> player-1 clicked to find a game
-> player-2 clicked to find a game
-> player-3 clicked to find a game
-> player-4 clicked to find a game
-> player-5 clicked to find a game
-> player-6 clicked to find a game
-> lobby has been created for players above {aggregation finished}
-> player-7 clicked to find a game
-> player-8 clicked to find a game
-> {aggregation status: **2** of **6** to create new lobby}#注:没有时间窗口。如果球员想找到一个大厅,我必须让他尽可能多地等待,直到他取消搜索。
我是聚合到List<>,并希望创建一个游说时,聚合(列表)大小将等于预期的球员数量为国防部选择。
我面临的问题是聚合在没有重置的情况下一次又一次地递增,我看到数以百计的玩家等待游说开始。我期望的行为类似于块,其中的球员被推到更多的游说和处理,同时,下一块应该聚集玩家从零到预期的球员数量再次作为新的游说。
发布于 2022-03-28 20:10:54
下面是处理玩家列表的预期聚合的代码。这里的问题是,我们可能需要停止对特定玩家的搜索(他发送了一个事件来停止搜索),这增加了拒绝您最近加入的游说团体的难度。(如果我们通过弹出式菜单通知客户,并接受/取消大厅作为批准这些已经在大厅的玩家的决定的最终决定,则可以选择管理该案件。)
.aggregate(ArrayList::new,
(k, v, agg) -> {
if (agg.size() == 5) {
return new ArrayList<>(List.of(v));
}
agg.add(v);
return agg;
}https://stackoverflow.com/questions/71606764
复制相似问题