我使用的是Flink-SQL 1.13。目标是实时计算新用户的数量。
由于一些限制,我不能直接使用注册事件,因为在那里创建的帐户就像平台传递一样。一个帐户可以登录到多个游戏,对于每个游戏,用户在第一次进入该游戏时是new。所以我只能通过检查这个账号之前是否从登录日志中登录到这个游戏来计算这个值。登录日志格式如下:
user_id game_id login_time
111 game1 2021-05-13 01:01:01
111 game3 2021-05-23 02:02:02问题是登录日志量每天都在显著增加。虽然我可以将日志保存到HBase中,但总有一天它仍然会太大……
有没有其他方法可以做到这一点?也许我可以把历史用户放到redis hyperloglog中,但似乎Flink-SQL没有一个redis连接器yet...Thanks来帮助你提前……
发布于 2021-05-25 01:43:43
INSERT INTO first_login_stream (user_id, first_login_time)
SELECT
user_id,
FIRST_VALUE(login_time) first_login_time
FROM login_log
GROUP BY user_id返回到您的事件系统/ kafka中。您可以在窗口中回读一些每小时的统计数据(您可以将其保存在HBase中):
INSERT INTO hbase_stats
SELECT
window_start,
window_end,
count(user_id) user_count
FROM TABLE(
TUMBLE(
TABLE first_login_stream,
DESCRIPTOR(<kafka_ingestion_time>),
INTERVAL '1' HOUR
)
)
GROUP BY
window_start,
window_end它必须设置检查点/保存(否则您将在重启时导致完整的日志处理)。状态大小只会随着用户数的增加而增加,而不会随着登录次数的增加而增加(我认为。您应该验证这一点。)
https://stackoverflow.com/questions/67671366
复制相似问题