我想在卡夫卡做这样的事:
我的应用程序接收特定的事件/数据,只从上面的存储区发送特定的数据集到主题. KStream/Ktable/Kafka-store
我们能用卡夫卡做这个吗?我不认为单独使用Kafka消费者会有帮助,因为当一组数据已经被消耗时,我们无法启动/暂停消费者。
发布于 2021-12-06 06:59:13
以下是流创建和连接流创建配置:-
CREATE STREAM Customer Profile WITH (
KAFKA_TOPIC = 'Customer Profile',
VALUE_FORMAT = 'AVRO',
KEY_FORMAT = 'AVRO');
CREATE STREAM Customer Buy WITH (
KAFKA_TOPIC = 'Customer Buy',
VALUE_FORMAT = 'AVRO',
KEY_FORMAT = 'AVRO');
在这里,我用另一个主题创建了一个Kafka流,这个主题是从MYSQL表创建的。
一旦流创建,我们需要在应用连接和筛选之后创建另一个流,但是流中的连接具有只能应用两个流的条件。这是联合前:-
CREATE STREAM account_summary_by_category_by_month_by_accunt_no(
account_no int,
date_time timestamp,
category text,
category_description text,
amount float,
customer_id int,
FROM Customer_Profile cp INNER JOIN Customer_Buy cb
WITHIN 7 DAYS
ON cp.id = cb.order_id;您可以像SQL一样将所有ksql函数应用于筛选数据。
https://stackoverflow.com/questions/70241418
复制相似问题