我正在尝试通过KsqlDb将数据从一个主题(读主题)转换到另一个主题(写主题)。
这是一直在生成的要读取的数据-topic
{
"orderNumber": "01235656",
"deliveryBarcode": "733998877",
"requestId": "1616516663000",
"status": "APPROVED_BY_SUPERVISOR"
}我写了这些ksqldb查询:
-- The general stream to read the topic is like this:
CREATE STREAM GENERAL_STREAM (
deliveryBarcode VARCHAR,
orderNumber VARCHAR,
requestId VARCHAR,
status VARCHAR
) WITH (
kafka_topic = 'read-topic',
value_format = 'json'
);
-- This is the stream to redirect the filtered data throgh 'write-topic'
CREATE STREAM REDIRECTION_STREAM
WITH (
partitions = 6,
replicas = 3,
kafka_topic = 'write-topic',
value_format = 'json'
) AS
SELECT
AS_VALUE(requestId) `requestId`,
ARRAY<STRUCT<
deliveryBarcode
orderNumber
>> `packages`
FROM EARTH_DELIVERY_COURIER_PUDOPACKAGESTATUSUPDATED_0
WHERE (payload -> status = 'APPROVED_BY_SUPERVISOR')
EMIT CHANGES;但我的查询不能工作,因为这一部分:
ARRAY<STRUCT<
deliveryBarcode
orderNumber
>> `packages`我期望的write-topic数据是这样的
{
"requestId": "1616516663000"
"packages":[
{
"ordernumber":"01235656",
"barcodenumber":"733998877"
}
]
}我应该如何修改这些查询,以便能够以预期的数组格式生成'packages‘字段?
发布于 2021-03-24 21:45:05
您可以尝试使用数组()和as_map()函数来生成预期的输出。
下面是我用来做这件事的CSAS:
CREATE STREAM REDIRECTION_STREAM
WITH (kafka_topic='write_topic', value_format='json')
AS SELECT
AS_VALUE(requestId) requestId,
ARRAY[
AS_MAP(
ARRAY['deliverybarcode', 'ordernumber'],
ARRAY[deliverybarcode, ordernumber]
)
] packages
FROM GENERAL_STREAM;以下是上述主题的输出
print 'write_topic' from BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/24 13:24:44.557 Z, key: <null>, value: {"REQUESTID":"1616516663000","PACKAGES":[{"ordernumber":"01235656","deliverybarcode":"733998877"}]}, partition: 0
^CTopic https://stackoverflow.com/questions/66781769
复制相似问题