一.高可用的由来

1.1 为什么需要Replication

在Kafka在0.8之前的版本中,是没有Replication,一旦某个broker宕机,则其上的所有Partition数据都不可被消费,这与Kafka的数据持久化和Delivery Guarantee设计原则相违背。

如果Producer使用同步模式则Producer则会在尝试重新发送message.send.max.retries(默认值是3)次后抛出异常,用户可以选择停止发送后续数据或者继续发送。而前者会造成数据的阻塞,后者会造成发送到Kafak数据的丢失。

如果Producer使用异步模式,则Producer会尝试重新发送message.send.max.retries(默认值是3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现问题。同时,Kafka的Producer并没有对异步模式提供fallback接口。

由此可见,在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作会曹成整个系统的可用性的降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,由此可见,Replication的引入是很有必要的。

1.2 Leader Election

引入Replication之后,同一个Partition可能会有多个Replica,而在这些之中需要选取一个Leader,Producer和Comsumer只会和这个Leader交互,其他作为Follower从Leader中复制数据。

因为需要保证同一个Partition中多个副本之间的数据一致性(其中一个宕机后其他副本必须继续服务,不能造成数据重复也不能造成数据丢失)。如果没有一个leader,所有副本都可同时读写数据,则需要多个副本之间联通,数据一致性和有序性非常难保证。引入leader之后,leader负责读写,follower只需要从leader中fetch数据。

二.Kafka 高可用设计分析

2.1 如何将所有Replica均匀分布整个集群

为了做好负载均衡,Kafka尽量将所有Partition均匀分配到整个集群中。一个典型的部署方式是一个Topic的Partition数量大与Broker数量。同时为了提高Kafka的容错机制,也需要同一个Partition的Replica尽量分布到不同的机器上。分配算法如下:

1.将所有的Broker(n个)和待分配的Partition排序

2.将第i个Partition分配到i mod n的Broker上

3.将第i个Partition的第j个Replica分配到((i+j)) mod n)个Broker上

2.2 副本策略

2.2.1 消息传递同步策略

Producer在发布消息到某个Partition时,先通过Zookeeper找到该Partition的Leader,无论这个该Topic的Replication的副本为多少,Producer只将该消息发送该Partition的Leader。Leader会将该消息写入本地Log。每个Follower都从Leader Pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入Log,向Leader发送ACK。一旦Leader收到ISR中所有的Replica的ACK,该消息就认为Commit,Leader将增加HW并且向Producer发送ACK。

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,没有必要等待ISR中的所有follower接受成功,所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择一下配置:

  • 0:producer不需要等待broker的ack,这个提供一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障的时候很有可能丢失数据

  • 1:producer等待broker的ack,partition的leader成功后返回ack,如果在followe同步成功之前leader故障,那么将会丢失数据

  • -1:producer等待broker的ack,partition的leader和followe全部成功后才会返回ack。如果在followe同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。

为了提高性能,每个Follower在接受到数据后立即向Leader发送ACK,而非等待数据写入Log中。因此,对于已经Commit的消息,Kafak只能保证它被存于Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该消息一定会被Consumer消费。

Consumer读消息也是从Leader读取,只有被commit过后的消息才会暴露给Consumer。

2.2.2 ACK前需要保证有多少个备份?

Leader收到数据,所有Follower开始同步数据,但是有一个Follower因为某种故障,迟迟不能与Leader同步,那么Leader就一直要等下去,直到它完成同步,才能发送ACK,这个怎么解决?

Leader维护一个动态的in-sync replica set(ISR),意和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ACK。如果follower长时间未向Leader同步数据,则该Follower将被剔除ISR。该时间阀值将由replica.lag.time.max.ms参数设定。Leader发生了故障之后,就会从ISR中选举新的Leader。

Kafak的复制机制既不是完全的同步复制,也不是单纯的异步复制。

  • 同步复制:要求将所有的工作的Follower都复制完,这条消息才会被认为commit。

  • 异步复制:Follower异步的从Leader复制数据,数据只要被Leader写入Log就认为已经commit,这种情况下如果Follower都复制完都落后Leader,如果Leader突然宕机,则会丢失数据。

一条下次只有被ISR所有Follower都会从Leader复制过去才会认为已提交。对于Producer而言,可以选择是否等待数据是否commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或一个以上Follower,一条commit就不会丢失。

在ISR中至少有一个Follower时,Kafa可以确保已经commit的数据不丢失,但是某个Partition的所有副本都宕机了,那么就无法保证数据不丢失了,这种情况下有两种可行的方案:

  • 等待ISR中任意一个副本活过来,并选择这个为Leader

  • 等待第一个活过来的副本(不一定是ISR中的)作为Leader

这个就是可用性和一致性中做个简单的折中。如果一定要等待ISR中的副本活过来,那不可用的时间就有可能相对较长。而且如果ISR中的所有副本都无法活过来,或者数据都丢失了,这个Partition将永久不可用了。选择第一个活的Replica作为Leader,而这个Replica不是ISR中的,那它并不保证已经包含了所有commiy的信息,它也会成为Leader。根据Kafka文档,在以后的文档中,Kafka支持用户通过配置选择两种方式中的一种,从而根据不同的使用场景选择高可用还是强一致性。

2.2.3 Leader选举方法

一种选举的方式是少数服从多数,但是Kafka并没有采取这个方式。在这种方式,如果我们有2f+1个副本(包含Leader和Follower),那在commit之前必须保证有f+1个副本复制完消息,为了保证选取新的Leader,fail的副本不能超过f个,因为在剩下的f+1个中,至少有一个副本包含最新的所有消息。但是这个方法所能容忍的fail的副本数量个数比较少。如果有容忍1个挂掉,必须要有3个副本。如果要容忍2个挂掉,就需要5个副本。大量的副本在大数据量下导致性能的急剧下降。

Kafka在Zookeepeer中动态维护了一个ISR,这个ISR中所有的副本都跟上Leader,只有ISR中的成员才能有被选为Leader的可能。在这种模式下,对于f+1个副本,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个副本的失败。在大多数使用场景中,这种模式是非常有利的。

将在所有Broker中选取一个Controller,所有Partition的Leader选举都由这个Controller决定。Controller会将Leader的改变直接通过RPC方式通知需要为此作相应的Broker。同时Controller也负责增删Topic以及Replica的重新分配。

2.2.4 Leader与Follower的数据一致性

LEO(Log End Offset):每个副本最大的Offset

HW(High Watermart):消费者能够见到的最大的offset,ISR中的最小LEO

follower发生故障:会临时提出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR。

leader发生故障:会从ISR中选取一个新的leader,之后,为保证多个副本之间的数据一致,其余的follower会先将各自的log文件高于HW部分截取掉,然后从Leader中同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失和不重复