我读过很多关于全局状态存储的文章,它没有为还原创建更改主题,而是使用源主题作为还原。
我正在创建自定义密钥并将数据存储在全局状态存储中,但是重新启动后它将消失,因为恢复上的全局存储将直接从源主题获取数据并绕过处理器。
我的输入主题有上面的数据。
{
"id": "user-12345",
"user_client": [
"clientid-1",
"clientid-2"
]
} 我经营着两家国营商店,具体如下:
json)
因此,我已经看到解决办法是创建一个自定义的更改日志主题,并发送带有该主题键的数据,该主题将充当全局状态存储的源主题。
但是在我的场景中,我必须在州立商店填写两张记录,什么是最好的方法。
示例场景:
Record1: {
"id": "user-1",
"user_client": [
"clientid-1",
"clientid-2"
]
}
Record2:{
"id": "user-2",
"user_client": [
"clientid-1",
"clientid-3"
]
} 全局状态存储应该有:
id -> json Record'
clientid-1: ["user-1", "user-2"]
clientid-2: ["user-2"]
clientid-3: ["user-2"]如何在全局状态存储中维护上述方案的还原情况
发布于 2020-03-10 15:42:25
一种方法是为GlobalKTable维护一个变更主题(有GlobalKTable),让我们称它为user_client_global_ktable_changelog,为了简单起见,假设我们将消息序列化为java类(您可以只使用HashMap或JsonNode之类的):
//initial message format
public class UserClients {
String id;
Set<String> userClient;
}
//message when key is client
public class ClientUsers {
String clientId;
Set<String> userIds;
}
//your initial topic
KStream<String, UserClients> userClientKStream = streamsBuilder.stream("un_keyed_topic");。
//re-map initial message to user_id:{inital_message_payload}
userClientKStream
.map((defaultNullKey, userClients) -> KeyValue.pair(userClients.getId(), userClients))
.to("user_client_global_ktable_changelog");//please provide appropriate serdesuserClientKStream
//will cause data re-partition before running groupByKey (will create an internal -repartition topic)
.flatMap((defaultNullKey, userClients)
-> userClients.getUserClient().stream().map(clientId -> KeyValue.pair(clientId, userClients.getId())).collect(Collectors.toList()))
//we have to maintain a current aggregated store for user_ids for a particular client_id
.groupByKey()
.aggregate(ClientUsers::new, (clientId, userId, clientUsers) -> {
clientUsers.getUserIds().add(userId);
return clientUsers;
}, Materialized.as("client_with_aggregated_user_ids"))
.toStream()
.to("user_client_global_ktable_changelog");//please provide appropriate serdes例如,用于在本地州聚合user_ids:
//re-key message for client-based message
clientid-1:user-1
//your current aggregated for `clientid-1`
"clientid-1"
{
"user_id": ["user-1"]
}
//re-key message for client-based message
clientid-1:user-2
//your current aggregated for `clientid-1`
"clientid-1"
{
"user_id": ["user-1", "user-2"]
}实际上,如果您做了一些更改(即主题your_application-client_with_aggregated_user_ids-changelog ),我们可以直接使用本地状态的changelog主题作为GlobalKTable的changelog,方法是通过调整状态来保持用户密钥和客户端密钥消息的有效负载。
https://stackoverflow.com/questions/60613596
复制相似问题