) WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_province', 'properties.bootstrap.servers' = 'kms )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category1', 'properties.bootstrap.servers' = 'kms )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category2', 'properties.bootstrap.servers' = 'kms )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category3', 'properties.bootstrap.servers' = 'kms ) WITH( 'connector' = 'kafka', 'topic' = 'mydw.order_detail', 'properties.bootstrap.servers' = 'kms
topic名称 canal.mq.topic=test 修改conf/canal.properties,修改内容如下: # 配置zookeeper地址 canal.zkServers =kms-2:2181,kms # 可选项: tcp(默认), kafka, RocketMQ, canal.serverMode = kafka # 配置kafka地址 canal.mq.servers = kms-2:9092,kms 启动kafka控制台消费者测试 bin/kafka-console-consumer.sh --bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092 --
) WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_province', 'properties.bootstrap.servers' = 'kms )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category1', 'properties.bootstrap.servers' = 'kms )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category2', 'properties.bootstrap.servers' = 'kms )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category3', 'properties.bootstrap.servers' = 'kms ) WITH( 'connector' = 'kafka', 'topic' = 'mydw.order_detail', 'properties.bootstrap.servers' = 'kms
Properties props = new Properties(); props.setProperty("bootstrap.servers", "kms-2:9092,kms ; // only required for Kafka 0.8 props.setProperty("zookeeper.connect", "kms-2:2181,kms
偏移量 'properties.group.id' = 'group1', -- 消费者组 'properties.bootstrap.servers' = 'kms-2:9092,kms 偏移量 'properties.group.id' = 'group1', -- 消费者组 'properties.bootstrap.servers' = 'kms-2:9092,kms
sink a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.sink1.brokerList=kms-2:9092,kms props = new Properties(); // kafka broker地址 props.put("bootstrap.servers", "kms-2:9092,kms