我们在ubuntu上使用汇合平台。我们通过cURL请求将简单的JSON数据发送到kafka主题上的kafka-rest服务器,名为"UE_Context“。
为此主题创建了一个名为"UE_CONTEXT_STREAM“的kafka流,命令如下:
CREATE STREAM UE_Context_Stream (ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', VALUE_FORMAT='JSON');为此主题创建了一个名为"UE_CONTEXT_TABLE“的kafka表,命令如下:
CREATE TABLE UE_Context_Table ( registertime BIGINT, ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', KEY='ue_key', VALUE_FORMAT='JSON');我使用以下cURL命令为主题提供了两行数据:
curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x1234", "ecgi" : "1234"}}]}' "http://localhost:8082/topics/UE_Context"
curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x4321", "ecgi" : "4321"}}]}' "http://localhost:8082/topics/UE_Context" 表上有一个select查询正在等待,如下所示:

当JSON数据被注入主题时,此查询将显示表信息。然后,我们停止将JSON数据注入主题,结束select查询并结束select查询。如果在稍后的时间点执行选择,则不会显示先前填充的表信息。没有办法保存这些数据吗?Kafka连接器和使用DB可能是一种选择。但是kSQL没有临时内存来存储表信息吗?
发布于 2018-12-07 02:02:17
在稍后的时间点执行选择,不显示先前填充的表信息。
select语句默认为主题的最新偏移量。
如果您想查看以前的数据,您需要将使用者偏移量设置回起始位置。
SET 'auto.offset.reset'='earliest';此外,如文件所述(重点)
SELECT语句本身就是一个non-persistent连续查询。SELECT语句的结果不会持久化到Kafka主题中,只会打印在KSQL控制台中。不要将CREATE创建的持久查询与SELECT语句的流查询结果混淆。
发布于 2021-01-19 18:32:39
https://stackoverflow.com/questions/53655597
复制相似问题