首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >全局状态存储不创建更改日志主题,如果全局存储的输入主题有空键,那么解决方法是什么?

全局状态存储不创建更改日志主题,如果全局存储的输入主题有空键,那么解决方法是什么?
EN

Stack Overflow用户
提问于 2020-03-10 08:07:42
回答 1查看 410关注 0票数 0

我读过很多关于全局状态存储的文章,它没有为还原创建更改主题,而是使用源主题作为还原。

我正在创建自定义密钥并将数据存储在全局状态存储中,但是重新启动后它将消失,因为恢复上的全局存储将直接从源主题获取数据并绕过处理器。

我的输入主题有上面的数据。

代码语言:javascript
复制
{
      "id": "user-12345",
      "user_client": [
        "clientid-1",
        "clientid-2"
      ]
} 

我经营着两家国营商店,具体如下:

json)

  • clientid-1:"user-12345"

  • clientid-2:"user-12345"

  1. id ->record (记录方式)

因此,我已经看到解决办法是创建一个自定义的更改日志主题,并发送带有该主题键的数据,该主题将充当全局状态存储的源主题。

但是在我的场景中,我必须在州立商店填写两张记录,什么是最好的方法。

示例场景:

代码语言:javascript
复制
Record1: {
          "id": "user-1",
          "user_client": [
            "clientid-1",
            "clientid-2"
          ]
    } 



 Record2:{
          "id": "user-2",
          "user_client": [
            "clientid-1",
            "clientid-3"
          ]
    } 

全局状态存储应该有:

代码语言:javascript
复制
id -> json Record'

clientid-1: ["user-1", "user-2"]
clientid-2: ["user-2"]
clientid-3: ["user-2"]

如何在全局状态存储中维护上述方案的还原情况

EN

回答 1

Stack Overflow用户

发布于 2020-03-10 15:42:25

一种方法是为GlobalKTable维护一个变更主题(有GlobalKTable),让我们称它为user_client_global_ktable_changelog,为了简单起见,假设我们将消息序列化为java类(您可以只使用HashMap或JsonNode之类的):

代码语言:javascript
复制
//initial message format
public class UserClients {
    String id;
    Set<String> userClient;
}
//message when key is client
public class ClientUsers {
    String clientId;
    Set<String> userIds;
}
//your initial topic
KStream<String, UserClients> userClientKStream = streamsBuilder.stream("un_keyed_topic");

  1. 很容易将记录重新键到user_id,只需重新键KStream,然后将其发送到输出主题

代码语言:javascript
复制
//re-map initial message to user_id:{inital_message_payload}
userClientKStream
        .map((defaultNullKey, userClients) -> KeyValue.pair(userClients.getId(), userClients))
        .to("user_client_global_ktable_changelog");//please provide appropriate serdes

  1. 聚合user_id对于特定的客户端,我们可以使用本地状态(KTable)来保持(当前client_id的当前user_ids列表):

代码语言:javascript
复制
userClientKStream
        //will cause data re-partition before running groupByKey (will create an internal -repartition topic)
        .flatMap((defaultNullKey, userClients)
                -> userClients.getUserClient().stream().map(clientId -> KeyValue.pair(clientId, userClients.getId())).collect(Collectors.toList()))
        //we have to maintain a current aggregated store for user_ids for a particular client_id
        .groupByKey()
        .aggregate(ClientUsers::new, (clientId, userId, clientUsers) -> {
            clientUsers.getUserIds().add(userId);
            return clientUsers;
        }, Materialized.as("client_with_aggregated_user_ids"))
        .toStream()
        .to("user_client_global_ktable_changelog");//please provide appropriate serdes

例如,用于在本地州聚合user_ids:

代码语言:javascript
复制
//re-key message for client-based message
clientid-1:user-1
//your current aggregated for `clientid-1`
"clientid-1"
{
    "user_id": ["user-1"]
}

//re-key message for client-based message
clientid-1:user-2
//your current aggregated for `clientid-1`
"clientid-1"
{
    "user_id": ["user-1", "user-2"]
}

实际上,如果您做了一些更改(即主题your_application-client_with_aggregated_user_ids-changelog ),我们可以直接使用本地状态的changelog主题作为GlobalKTable的changelog,方法是通过调整状态来保持用户密钥和客户端密钥消息的有效负载。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60613596

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档