我有数据流从传感器服务器进入我的主题,这是我无法控制的。
在主题A中,有多个传感器数据的有效载荷传入(a,b,c,d.)。
在主题B中有指示信息传入(比如1,2,.),告诉我,从现在开始,来自主题A的传感器数据属于新对象x,而不是x-1。
我想加入来自主题A的数据,这些数据对应于当时来自主题B的当前对象。
我对KSQL和流逻辑非常陌生,所以我不知道这是否可能。感觉上可能有一个非常简单的解决方案,但我在示例中没有找到这样的解决方案。
编辑:
传感器数据(主题A)可以如下所示:
sensorPath timestamp value
simulation/machine/plc/sensor-1 | 1 | 7.0
simulation/machine/plc/sensor-2 | 1 | 2.0
simulation/machine/plc/sensor-1 | 2 | 6.0
simulation/machine/plc/sensor-2 | 2 | 1.0
...
simulation/machine/plc/sensor-1 | 10 | 10.0
simulation/machine/plc/sensor-2 | 10 | 12.0指标数据(主题B)可以如下所示
informationPath timestamp WorkpieceID
simulation/informationString | 1 | 0020181
simulation/informationString | 10 | 0020182我基本上希望在一个新的主题/流中,将传感器数据与相应的工件相匹配。新的到达传感器数据总是属于最新的informationString /工件。
因此,主题C看起来应该是:
sensorPath SensorTimestamp value WorkpieceID
simulation/machine/plc/sensor-1 | 1 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 1 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2 | 1.0 | 0020181
...
simulation/machine/plc/sensor-1 | 10 | 10.0| 0020182
simulation/machine/plc/sensor-2 | 10 | 12.0| 0020182所以我需要加入topicA.timestamp >= current(topicB.timestamp)之类的东西?!
发布于 2018-11-07 23:30:32
是的,您可以使用KSQL来完成这个任务。下面是一个有用的例子。我在这里使用这个停靠-合成文件作为测试环境,如果您想重现下面的示例的话。
首先,根据您提供的示例填充一些测试数据。我已经根据当前的时代,+2和+10秒组成了时间戳:
现在,我启动了KSQL:
docker run --network cos_default --interactive --tty --rm \
confluentinc/cp-ksql-cli:5.0.0 \
http://ksql-server:8088在KSQL中,我们可以检查主题中的源数据:
KSQL> PRINT 'sensor' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}
KSQL> PRINT 'indicator' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}现在我们注册要在KSQL中使用的主题,并声明架构:
ksql> CREATE STREAM SENSOR (SENSORPATH VARCHAR, VALUE DOUBLE, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='sensor',TIMESTAMP='timestamp');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM INDICATOR (INFORMATIONPATH VARCHAR, WORKPIECEID VARCHAR, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='indicator',TIMESTAMP='timestamp');
Message
----------------
Stream created
----------------我们可以查询已经创建的KSQL流:
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , sensorpath, value FROM sensor;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-1 | 7.0
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-2 | 2.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-1 | 6.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-2 | 1.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-1 | 10.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-2 | 12.0
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , informationPath, WorkpieceID FROM indicator;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/informationString | 0020181
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/informationString | 0020182注意,流的ROWTIME与PRINT输出中的ROWTIME不同。这是因为PRINT输出显示了Kafka消息时间戳,而在流中,我们重写了WITH子句中的时间戳,从而使用来自消息有效负载本身的timestamp列。
为了将这两个主题结合起来,我们将做两件事:
WorkpieceID值的当前状态。要添加人工连接键,只需选择一个常量并使用AS子句将其别名,并使用它作为带有PARTITION BY的消息键:
ksql> CREATE STREAM SENSOR_KEYED AS SELECT sensorPath, value, 'X' AS JOIN_KEY FROM sensor PARTITION BY JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------为了感兴趣,我们可以查看由此产生的卡夫卡主题
ksql> PRINT SENSOR_KEYED FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":7.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":2.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":6.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":1.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":10.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":12.0,"JOIN_KEY":"X"}注意,ROWKEY现在是JOIN_KEY,而不是PRINT 'sensor'输出中的NULL。如果省略PARTITION BY,则添加JOIN_KEY,但消息保持不键,这不是我们希望连接能够工作的结果。
现在,我们也重新输入指标数据:
ksql> CREATE STREAM INDICATOR_KEYED AS SELECT informationPath, WorkpieceID, 'X' as JOIN_KEY FROM indicator PARTITION BY JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
ksql> PRINT 'INDICATOR_KEYED' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020181","JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020182","JOIN_KEY":"X"}重新键入指示符数据后,我们现在可以将其注册为KSQL表。在表中,键的状态由KSQL返回,而不是每个事件。我们使用这种方法根据时间戳确定与传感器读取相关联的WorkpieceID。
ksql> CREATE TABLE INDICATOR_STATE (JOIN_KEY VARCHAR, informationPath varchar, WorkpieceID varchar) with (value_format='json',kafka_topic='INDICATOR_KEYED',KEY='JOIN_KEY');
Message
---------------
Table created
---------------查询表显示一个值,它是当前状态:
ksql> SELECT * FROM INDICATOR_STATE;
1541623771000 | X | X | simulation/informationString | 0020182如果此时您向indicator主题发送了另一条消息,则表的状态将更新,您将看到从SELECT发出的新行。
最后,我们可以做一个流表连接,坚持到一个新的主题:
ksql> CREATE STREAM SENSOR_ENRICHED AS SELECT S.SENSORPATH, TIMESTAMPTOSTRING(S.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SENSOR_TIMESTAMP, S.VALUE, I.WORKPIECEID FROM SENSOR_KEYED S LEFT JOIN INDICATOR_STATE I ON S.JOIN_KEY=I.JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------检查新流:
ksql> DESCRIBE SENSOR_ENRICHED;
Name : SENSOR_ENRICHED
Field | Type
----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SENSORPATH | VARCHAR(STRING)
SENSOR_TIMESTAMP | VARCHAR(STRING)
VALUE | DOUBLE
WORKPIECEID | VARCHAR(STRING)
----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;查询新流:
ksql> SELECT SENSORPATH, SENSOR_TIMESTAMP, VALUE, WORKPIECEID FROM SENSOR_ENRICHED;
simulation/machine/plc/sensor-1 | 2018-11-07 20:39:31 +0000 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:39:31 +0000 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:40:31 +0000 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:40:31 +0000 | 1.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:49:31 +0000 | 10.0 | 0020182
simulation/machine/plc/sensor-2 | 2018-11-07 20:49:31 +0000 | 12.0 | 0020182因为这是KSQL,所以SENSOR_ENRICHED流(以及同名的底层主题)将被持续填充,由到达sensor主题的事件驱动,并反映基于发送到indicator主题的事件的任何状态更改。
https://stackoverflow.com/questions/53175852
复制相似问题