首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KStream聚合固定列表(有限大小)

KStream聚合固定列表(有限大小)
EN

Stack Overflow用户
提问于 2022-03-24 17:17:10
回答 1查看 129关注 0票数 0

我想知道是否可能(默认情况下)为从特定主题接收到的特定批消息添加聚合。

描述:我有球员正在寻找一个大厅,这是自动创建。这个想法是玩家运行一个搜索游戏和消息的{ id,mod,map }被发送到主题。每个mod/map包含一个规则,允许多少玩家加入,并且基于这个数量,我试图聚集这些玩家。当大厅(游戏)的状态看起来已经准备好启动一个大厅(10/10玩家)时,我们应该将细节转换为一条消息,并将其进一步推送到处理中,同时聚合应该刷新并从0开始。

代码语言:javascript
复制
KStream<String, List<String>> gameLobbyEventStream =
    streamsBuilder.stream(
            "GAME_MATCHMAKING_LOBBY_EVENT-TOPIC",
            Consumed.with(Serdes.String(), Serdes.serdeFrom(String.class)))
        .peek((k, v) -> log.info("key:{}, value:{}", k, v))
        .groupBy((k, v) -> k, Grouped.with(Serdes.String(), Serdes.String()))
        .aggregate(
            ArrayList::new,
            (k, v, agg) -> {
              agg.add(v);
              return agg;
            },
            Materialized.with(
                Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String())))
        .toStream();

gameLobbyEventStream
    .filter((k, v) -> isReadyToStartLobby(v))
    .peek((k, v) -> log.info("lobby is creating for {}", v))
    // convert to lobby { id, players, details }
    .to("LAUNCH_MATCHMAKING_LOBBY_EVENT-TOPIC");

#真实情况:模式:死亡比赛(3x36允许玩家)

代码语言:javascript
复制
 -> player-1 clicked to find a game
 -> player-2 clicked to find a game   
 -> player-3 clicked to find a game  
 -> player-4 clicked to find a game  
 -> player-5 clicked to find a game
 -> player-6 clicked to find a game

 -> lobby has been created for players above {aggregation finished}

 -> player-7 clicked to find a game
 -> player-8 clicked to find a game
 -> {aggregation status: **2** of **6** to create new lobby}

#注:没有时间窗口。如果球员想找到一个大厅,我必须让他尽可能多地等待,直到他取消搜索。

我是聚合到List<>,并希望创建一个游说时,聚合(列表)大小将等于预期的球员数量为国防部选择。

我面临的问题是聚合在没有重置的情况下一次又一次地递增,我看到数以百计的玩家等待游说开始。我期望的行为类似于块,其中的球员被推到更多的游说和处理,同时,下一块应该聚集玩家从零到预期的球员数量再次作为新的游说。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-03-28 20:10:54

下面是处理玩家列表的预期聚合的代码。这里的问题是,我们可能需要停止对特定玩家的搜索(他发送了一个事件来停止搜索),这增加了拒绝您最近加入的游说团体的难度。(如果我们通过弹出式菜单通知客户,并接受/取消大厅作为批准这些已经在大厅的玩家的决定的最终决定,则可以选择管理该案件。)

代码语言:javascript
复制
.aggregate(ArrayList::new,
    (k, v, agg) -> {
        if (agg.size() == 5) {
          return new ArrayList<>(List.of(v)); 
        }
       agg.add(v);
      return agg; 
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71606764

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档