
在创建KafkaProducer时需要传入Properties指定一些参数
bootstrap.servers: kafka集群各个服务器地址key.serializer: 用来序列化key的实现org.apache.kafka.common.serialization.Serializer接口的类(如果没有指定partition,相同key会发送到同一分区)value.serializer: 用来序列化value的实现org.apache.kafka.common.serialization.Serializer接口的类buffer.memory: 整个Producer使用的内存,默认大小为32M,这些内存可以用于RecordAccumulator,NetWorkClient中未确认的请求等。batch.size: 当多条记录发送到同一partition时,producer将会尝试将这些记录批处理到一起,及当记录累计达到batch.size后再一起发送,默认大小为16K。可与linger.ms配合使用。linger.ms: 当partition中的记录迟迟达不到batch.size的大小时,如果不设置超时时间则这些记录可能一直阻塞,设置linger.ms可以让记录在超时后发送而不会堆积,默认为0ms即立即发送,通常可设置范围到`5~100ms``acks: NetworkClient发送请求消息到kafka cluster后cluster的应答机制,取值可以为0, 1, -1或all,其中-1和all是一样的。retries: NetworkClient发送请求消息的失败重试次数。replica.lag.time.max.ms: ISR队列中节点的同步时间,超出这个时间不同步则会被移除ISR队列。min.insync.replicas: ISR队列的最小应答数enable.idempotence: 开启幂等性max.in.flight.requests.per.connection: 在阻塞前,每个broker最多缓存5个未确认的请求,第6个请求进来时不发送,直到有未确认的请求得到确认。acks取值可以为0, 1, -1或all,其中-1和all是一样的。
0: producer发送过来的数据,不需要等待数据持久化就立马应答
1: producer发送过来的数据,Leader收到数据后持久化成功后就应答
-1或all: producer发送过来的数据,Leader和ISR队列满足最小应答数后才应答,其中ISR队列满足最小应答数可以通过min.insync.replicas参数设置。
ISR队列指的是和Leader保持同步的Leader和Follower的集合,注意这里也包括Leader自己。eg:(leader:0, isr:0, 1, 2)
acks=0,可靠性低一般不使用
acks=1: Leader会应答,可靠性高一点点,但如果在Leader收到数据后节点就挂了则也会丢失数据,一般用于日志传输的场景,允许丢一点数据
acks=-1: 可靠性很强,但效率非常低。并且在没开启幂等性的情况下会产生重复数据。如Leader将数据同步到isr后,返回应答的时候挂了,这时候新的Leader出现,producer重试又发送了数据,导致数据重复。

acks=-1 + 分区副本(leader+follower)数大于等于2 + ISR应答的最小副本数大于等于2。数据不丢失,但不开启幂等性的情况下数据可能会重复。acks=1。数据不会重复,但可能会丢失。幂等性的判断标准: 具有<PID,Partition,SeqNumber>相同三元组的消息,broker只持久化一次
PID: 每个新的Producer在初始化的时候会被分配一个唯一的PIDPartition: 消息要发往的分区SeqNumber: 对每个PID,Producer发送时会让<topic,partition>对应一个单调递增的SeqNumber,并且broker也会缓存Producer的SeqNumber,在broker收到消息时,若SeqNumber比当前缓存的值小,则把消息丢弃,否则接受判重。幂等性只能保证单会话,单分区消息不重复
// 初始化事务
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 在事务内提交已经消费的偏移量(消费者使用)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃事务
void abortTransaction() throws ProducerFencedException;Consumer消费多分区数据整体不能是有序的,可以做到有序消费单分区数据,但有前置条件。
1.x版本之前,需要保证max.in.flight.requests.per.connection=1
1.x版本之后
max.in.flight.requests.per.connection=1max.in.flight.requests.per.connection<=5
在1.x版本后,kafka会缓存producer发送过来在的5个请求的数据,并对其进行排序。从而保证5个请求内的消息在kafka内是有序的。
bootstrap.servers: kafka集群各个服务器地址key.deserializer: 用来反序列化key的实现org.apache.kafka.common.serialization.Deserializer接口的类value.deserializer: 用来反序列化value的实现org.apache.kafka.common.serialization.Deserializer接口的类partition.assignment.strategy: 消费者组的分区策略,有RangeAssignor,RoundRobinAssignor,StickyAssignor, CooperativeStickyAssignor。也可以通过自己实现ConsumerPartitionAssignor接口,多个分区策略可以混合使用。默认RangeAssignor+ CooperativeStickyAssignogroup.id: 消费者组idenable.auto.commit: 消费者是否自动提交offset,默认是trueauto.commit.interval.ms: 消费者自动提交offset的时间间隔,默认为5000 (5 seconds)。max.poll.records: 对poll()的单个调用中返回的最大记录数Reference:
先对单个topic的各个分区partition以及各个消费者组里的各个consumer按序号排序。然后用partition数/consumer数得到每个消费者需要消费的分区数量,然后每次将连续的partition按相除的数量分配给每个consumer。这里可能会出现不能整除的情况,多出来的余数个分区则按顺序分给每个消费者,也就意味着有一些消费者会多消费一个分区。示意图:

缺点:
这种分配算法会造成数据倾斜,即会造成某个consumer压力过大。对于少个topic来说,consumer对于每个topic多消费一个分区问题不大,如果kafka里有很多的topic,而这些topic多出来的分区都要由排序靠前的consumer来承当,则会造成这些consumer的负载压力要比其他的大的多。
RangeAssignor是针对单个topic的,而RoundRobinAssignor是针对所有topic的,RoundRobinAssignor会将所有topic的分区partition和消费者排序,然后轮询所有的partition分配给每个消费者。如果订阅的topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些topic的分配。

在RangeAssignor和RoundRobinAssignor中,当有consumer挂掉时都会做重分配rebalance,即重新分配每个消费者对应消费哪个分区,重分配后他们消费的分区可能会和rebalance前的差别较大。StickyAssignor就是尽量做到rebalance前后消费者负责的分区不发生较大变化,即每次重分配的分配结果尽量和上一次的保持一致。并且相比前面两种策略其在分区的时候尽可能做到随机分配而不是顺序分配。
总之,StickyAssignor的目标有两点:
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。
上述三种分区分配策略均是基于eager协议,Kafka2.4.0开始引入CooperativeStickyAssignor策略。CooperativeStickyAssignor与之前的StickyAssignor虽然都是维持原来的分区分配方案,最大的区别是:StickyAssignor仍然是基于eager协议,分区重分配时候,都需要consumers先放弃当前持有的分区,重新加入consumer group;而CooperativeStickyAssignor基于cooperative协议,该协议将原来的一次全局分区重平衡,改成多次小规模分区重平衡。
具体解析见Reference2
为了能够记录consumer group消费某topic的进度,kafka采用了offset来记录消费进度。
在Kafka 0.9之前,这些offset信息是保存在zookeeper中的,在0.9后则保存到kafka的一个内置的topic,__consumer_offsets中。该topic有50个分区。默认是无法读取的,可以通过设置exclude.internal.topics=false来读取。

默认情况下消费者的offset是自动提交的,可通过enable.auto.commit来设计是否开启以及设置auto.commit.interval.ms设置自动提交的时间间隔。
消费者可以手动提交offset,方式可以是异步和同步,同时也可以指定offset的位置开始消费(可通过时间来找到指定offset然后开始消费,如消费从一天前的现在对应的offset,对应api为offsetsForTimes开始消费)。
partition: 消息的分区
log: 逻辑概念,指的是保存分区信息的文件综合
segement: 每个log由多个segement组成,每个segement最大为1G,其由xxx.index,xxx.log,xxx.timeindx组成,log文件保存的是消息,其中包括消息的大小,offset等。而index消息保存的其log的稀疏索引,而timeindex时间戳文件保存的是当前日志段的最大的时间戳以及该时间戳对应的偏移量。
segement的大小可以通过log.segment.bytes设置,默认1G

创建副本存储计划
{
"version":1,
"partitions":[
"topic":"test", // 修改哪个主题
"partition":0, // 修改主题里哪个分区
"replicas":[0,1,2] // 副本要放在哪些broker
},
{
"topic":"test",
"partition":1,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":2,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":3,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":4,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":5,
"replicas":[0,1,2]
},
{
"topic":"test",
"partition":6,
"replicas":[0,1,2]
},
]
}执行计划
bash ./kafka-reassign-partitions.sh --bootstrap-server kafka1:9091,kafka2:9092,kafka3:9093 --reassignment-json-file define-create-partition.json --execute验证
bash ./kafka-reassign-partitions.sh --bootstrap-server kafka1:9091,kafka2:9092,kafka3:9093 --reassignment-json-file define-create-partition.json --verifykafka数据默认保存7天,可以调整以下参数修改:
log.retention.hours: 最低优先级,按小时数来清理,默认168(7天)log.retenion.minutes: 分钟log.retenion.ms: 最高优先级,毫秒级别log.retenion.check.interval.ms: 负责设置的检查文件是否清理的周期,默认5分钟当超时后,kafka有两种策略要处理消息,分别是delete和compact,可以通过修改以下参数来设定超时删除策略
log.cleanup.policy: 当segement超出大小之后的默认清理策略,其中有效的策略选项可以使用逗号分开。有效的策略选项有delete和compact。默认deletedelete
默认开启了按照时间删除过期时间。其是按照xxx.timeindex记录的时间来标志的,因此基于时间的删除是以该segement的所有记录的最大时间戳来作为该文件的时间戳来删除的。
同时delete也提供了基于大小的删除配置,其配置参数如下:
log.retention.bytes: 即日志文件达到多大则删除,默认为-1即不限制,这个选项的值如果小于segment文件大小的话是不起作用的。segment文件的大小取决于log.segment.bytes配置项,默认为1G。另外,Kafka的日志删除策略并不是非常严格的(比如如果log.retention.bytes设置了10G的话,并不是超过10G的部分就会立刻删除,只是被标记为待删除,Kafka会在恰当的时候再真正删除),所以请预留足够的磁盘空间。当磁盘空间剩余量为0时,Kafka服务会被kill掉。
Reference: https://wiki.eryajf.net/pages/9fc4fa/#%E6%8C%89%E6%97%B6%E9%97%B4%E5%88%A0%E9%99%A4
compact
在compact中,若数据过期,则采用压缩形式。具体是保留每一个key的最后一个版本的数据。(在Producer发送时可以指定key,相同的key会被发到同一个partition)。如下图:

具有相同key的数据只保留最新的那个版本。压缩后的offset不连续,当consumer消费的offset找不到后会向上找到最近的一个offset开始消费。如当尝试从offset=5开始消费时,其会找到offset=6。
Reference: https://juejin.cn/post/6863050320646406158
kafka会把每个topic分为多个partition,并行处理加快速度。
kafka写log日志时采用的是文件追加的形式,顺序读写速度快。并且由于log分成了多个segement,segement删除时都是整块删除的,这也避免了对文件的随机读写操作。并且在定位数据log时也可以采用xxx.index提供的稀疏索引来快速定位。
broker网络IO数据持久化到磁盘(Producer->broker)中以及broker消息发送到网络IO中(broker->Consumer)

而采用零拷贝后,如mmap(),则可以减少两次CPU拷贝

Kafka 在这里采用的方案是通过 NIO 的 transferTo/transferFrom 调用操作系统的 sendfile 实现零拷贝。总共发生 2 次内核数据拷贝、2 次上下文切换和一次系统调用,消除了 CPU 数据拷贝。sendfile仅将内核空间缓冲区中对应的数据描述信息(文件描述符、地址偏移量等信息)记录到socket缓冲区中。

Producer数据批处理
设定batch.size来让发往同一分区的数据批处理发出。
Producer对消息的压缩
Producer可以采用各种算法对消息进行压缩,有Snappy、Gzip、LZ4等