我有保存到HBase HTABLE的代码。预期的行为是该表将每个分区的提交或“刷新”提交到hbase。rdd.foreachPartition(p => { val mutator = connection.getBufferedMutator(TableName.valueOf(HTABLE))
val hRow = new Put(rowkey)
hRow.addCol
item.getKey()+" and value is : "+item.getValue())现在,通过调用下面的构造函数,我创建了一个以KafkaTemplate为false的autoFlushpublic KafkaTemplate(mykafkaProducerFactory, boolean autoFlush)
现在,我有了一个异步生成器,在10秒内生成10条消息。