介绍Apache Kafka作为一个分布式流处理平台和消息中间件,如何支持大规模数据的实时传输和处理。讲解Kafka的核心组件,如Broker、Producer、Consumer、Topic和Partition,以及它们如何协作实现高吞吐量和高可用性的数据流。
chou403
/ Kafka
/ c:
/ u:
/ 60 min read
一学一个不吱声
Kafka是什么
Kafka是一种高吞吐量的分布式发布订阅消息系统(消息引擎系统),它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
其实我们简单点理解就是系统A发送消息给kafka(消息引擎系统),系统B从kafka中读取A发送的消息。而kafka就是个中间商。
消息系统简介
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:** 点对点传递模式,发布-订阅模式。大部分的消息系统选用发布-订阅模式**。Kafka就是一种发布-订阅模式。
点对点消息传递模式
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
生产者发送一条消息到queue,只有一个消费者能收到。
发布-订阅消息传递模式
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
Kafka 简单理解
上面我们提到kafka是个中间商,我们为什么不能去掉这个中间商呢,凭着我们的想象也会觉得去掉这些消息引擎系统会更好吧,那我们来谈谈消息引擎系统存在的意义:
原因就是”削峰填谷”。这四个字简直比消息引擎本身还要有名气。
所谓的”削峰填谷”就是指缓冲上下游瞬时突发流量,使其更平滑。特别是对于那种发送能力很强的上游系统,如果没有消息引擎的保护,“脆弱”的下游系统可能会直接被压垮导致全链路服务”雪崩”。但是,一旦有了消息引擎,它能够有效地对抗上游的流量冲击,真正做到将上游的”峰”填满到”谷”中,避免了流量的震荡。消息引擎系统的另一大好处在于发送方和接收方的松耦合,这也在一定程度上简化了应用的开发,减少了系统间不必要的交互。
我们想象一下在双11期间我们购物的情景来形象的理解一下削峰填谷,感受一下Kafka在这中间是怎么去”抗”峰值流量的吧:
当我们点击某个商品以后进入付费页面,可是这个简单的流程中就可能包含多个子服务,比如点击购买按钮会调用订单系统生成对应的订单,而处理该订单会依次调用下游的多个子系统服务 ,比如调用支付宝和微信支付的接口,查询你的登录信息,验证商品信息等。显然上游的订单操作比较简单,所以它的 TPS(每秒事务处理量) 要远高于处理订单的下游服务,因此如果上下游系统直接对接,势必会出现下游服务无法及时处理上游订单从而造成订单堆积的情形。特别是当出现类似于秒杀这样的业务时,上游订单流量会瞬时增加,可能出现的结果就是直接压跨下游子系统服务。
解决此问题的一个常见做法是我们对上游系统进行限速,但这种做法对上游系统而言显然是不合理的,毕竟问题并不出现在它那里。所以更常见的办法是引入像 Kafka 这样的消息引擎系统来对抗这种上下游系统 TPS 的错配以及瞬时峰值流量。
还是这个例子,当引入了 Kafka 之后。上游订单服务不再直接与下游子服务进行交互。当新订单生成后它仅仅是向 Kafka Broker 发送一条订单消息即可。类似地,下游的各个子服务订阅 Kafka 中的对应主题,并实时从该主题的各自分区(Partition)中获取到订单消息进行处理,从而实现了上游订单服务与下游订单处理服务的解耦。这样当出现秒杀业务时,Kafka 能够将瞬时增加的订单流量全部以消息形式保存在对应的主题中,既不影响上游服务的 TPS,同时也给下游子服务留出了充足的时间去消费它们。这就是 Kafka 这类消息引擎系统的最大意义所在。
Kafka 的优点特点
-
解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的,基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
-
冗余(副本)
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
-
扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码,不需要调节参数。扩展就像调大电力按钮一样简单。
-
灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
-
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
-
顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
-
缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
-
异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
Kafka中的术语解释概述
下图展示了Kafka的相关术语以及之间的关系:
- 上图中一个topic配置了3个partition。Partition1有两个offset: 0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。
- 如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
broker
- Kafka 集群包含一个或多个服务器,服务器节点称为broker。
- broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
- 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
- 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Topic
- 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
- 类似于数据库的表名。
Partition
- topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Producer
- 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer
- 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Consumer Group
- 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
Leader
- 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
Follower
- Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉,卡住或者同步太慢,leader会把这个follower从”in sync replicas”(ISR)列表中删除,重新创建一个Follower。
Kafka架构
如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU,Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Topics和Partition
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示:
符合删除条件的日志文件的最小时间
log.retention.hours=168
日志段文件的最大大小。当达到这个大小时,将创建一个新的日志段。
log.segment.bytes=1073741824
检查日志段的时间间隔,以确定它们是否可以根据保留策略被删除
log.retention.check.interval.ms=300000
如果设置了log.cleaner.enable =true,则清理器将被启用,然后可以为日志压缩标记单个日志。
log.cleaner.enable=f
因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offset由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
Producer消息路由
Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。
在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Partition。Paritition机制可以通过指定Producer的paritition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。
Consumer Group 消费群体
使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。
这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。
Push与Pull
作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
Kafka delivery guarantee
有这么几种可能的delivery guarantee:
At most once 消息可能会丢,但绝不会重复传输
At least one 消息绝不会丢,但可能会重复传输
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
当Producer向broker发送消息时,一旦这条消息被commit,因为replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。
接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易的使用这种方式。
ack 机制
producer端设置request.required.acks
。
- request.required.acks = 0: 只要请求已发送出去,就算是发送完了,不关心有没有写成功。性能很好,如果是对一些日志进行分析,可以承受丢数据的情况,用这个参数,性能会很好。吞吐量高。
- request.required.acks = 1(默认): 发送一条消息,当leader partition写入成功以后,才算写入成功。不过这种方式也有丢数据的可能。
- request.required.acks = -1/all: 需要ISR列表里面,所有 replica 都写完以后,这条消息才算写入成功。这才是 ISR 的正确应用场景,可靠性最高。
ISR 的最坏情况
排除所有 replica 全部故障,ISR 的最坏情况就是 ISR 中只剩 leader 自己一个了。退化成 ack=1 的情况了,甚至还不如 ack=1。ack=1,说的是 producer 不等服务器完全同步完 ISR,只要 leader 写入成功就行了,但是可没说不进行同步了。该有的同步过程还是会进行的,但凡能同步,kafka 肯定会同步的,而 ack=1 的最坏情况,也是 ISR 只剩下 leader 了。换句话说,producer 为了提高吞吐量,没等 ISR 全部同步,但是心里还是希望接口同步完成的。而这种 leader 孤家寡人的最坏情况,书上说”退化成 ack=1”,不足以说明问题的严重性。
ISR 的最坏情况,会使 ack=-1 退化成 ack=1 的最坏情况,完全背离我们设置-1 的初衷(因为特定是同步不了了)。
数据不丢失的方案:
- 分区副本 >= 2
- acks = -1
- min.insync.replicas >= 2
下面给出此时leader出现故障的情况,可以看出,此时数据可能重复。
Leader维护了⼀个动态的 in-sync replica set(ISR): 和 Leader 保持同步的 Follower 集合。当 ISR 集合中的 Follower 完成数据的同步之后,Leader 就会给 Follower 发送 ACK。如果 Follower ⻓时间未向 Leader 同步数据,则该 Follower 将被踢出 ISR 集合,该时间阈值由replica.lag.time.max.ms 参数设定。Leader 发⽣故障后,就会从 ISR 中选举出新的 Leader。 kafka服务端中min.insync.replicas。 如果我们不设置的话,默认这个值是1。一个leader partition会维护一个ISR列表,这个值就是限制ISR列表里面 至少得有几个副本,比如这个值是2,那么当ISR列表里面只有一个副本的时候,往这个分区插入数据的时候会报错。
Kafka高可用
高可用的由来
为何需要Replication
- Kafka在0.8以前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上所有的Partition数据都不可被消费,这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。
- 如果Producer使用同步模式则Producer会在尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,用户可以选择停止发送后续数据也可选择继续选择发送。而前者会造成数据的阻塞,后者会造成本应发往该Broker的数据的丢失。
- 如果Producer使用异步模式,则Producer会尝试重新发送message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题。同时,Kafka的Producer并未对异步模式提供callback接口。
- 由此可见,在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言Replication机制的引入非常重要。
Leader Election(选举机制)
- 引入Replication之后,同一个Partition可能会有多个Replica,而这时需要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica作为Follower从Leader中复制数据。
- 因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。
Kafka HA设计解析
如何将所有Replica均匀分布到整个集群
为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
Kafka分配Replica的算法如下:
- 将所有Broker(假设共n个Broker)和待分配的Partition排序。
- 将第i个Partition分配到第(i mod n)个Broker上。
- 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上。
Data Replication(副本策略)
Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。
1.消息传递同步策略
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少,Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。
Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。
Kafka Replication的数据流如下图所示:
2.ACK前需要保证有多少个备份
对于Kafka而言,定义一个Broker是否”活着”包含两个条件:
- 一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)。
- 二是Follower必须能够及时将Leader的消息复制过来,不能”落后太多”。
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的”落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是4000)或者Follower超过一定时间(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
需要说明的是,Kafka只解决fail/recover,不处理”Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。
3.Leader Election算法
Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:
- 节点名称唯一性: 多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁。
- 临时顺序节点: 所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁。
一种非常常用的选举leader的方式是”Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个Replica(包含Leader和Follower),那在commit之前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。因为在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几个Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的Replica,如果要容忍2个Follower挂掉,必须要有5个以上的Replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种方式。
Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。
虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。
4.如何处理所有Replica都不工作
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
- 等待ISR中的任一个Replica”活”过来,并且选它作为Leader
- 选择第一个”活”过来的Replica(不一定是ISR中的)作为Leader
这就需要在可用性和一致性当中作出一个简单的折中。如果一定要等待ISR中的Replica”活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法”活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个”活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
5.选举Leader
最简单最直观的方案是,所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
但是该方法会有3个问题:
- split-brain: 这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica”看”到的状态是一样的,这就可能造成不同Replica的响应不一致
- herd effect: 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
- ZooKeeper负载过重: 每个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。
Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
AR,ISR,LEO,HW
- AR: Assigned Replicas的缩写,是每个partition下所有副本(replicas)的统称;
- ISR: 副本同步队列(In-Sync Replicas)的缩写,是指副本同步队列,ISR是AR中的一个子集;
- LEO: 日志末端位移(Log End Offset)的缩写,表示每个partition的log最后一条Message的位置。新消息写入时将分配的偏移量(Offset)值,从0开始,随着消息不断写入递增。
- HW: 高水位(High Watermark)的缩写,是指consumer能够看到的此partition的位置。 取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。
kafka 中为了防止 log 文件过大导致数据定位效率低下而采取了分片和索引机制,将每个物理上的 partition 分为多个 segment。每个 segment 对应两个文件—“.index”文件和”.log”文件。“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。
但是对于上层应用来说,可以将partition看成最小的存储单元(一个由多个segment文件拼接而成的”巨型”文件),每个partition都由一些列有序的,不可变的消息组成,这些消息被连续的追加到partition中。
ISR和AR
ISR (In-Sync Replicas),这个是指副本同步队列。副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。默认情况下Kafka的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3。 所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 从 0.9.0.0 版本后中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
为什么在Kafka 0.9.0.0版本后移除了replica.lag.max.messages参数而只保留了replica.lag.time.max.ms作为ISR中副本管理的参数呢?
replica.lag.max.messages表示当前某个副本落后leader的消息数量超过了这个参数的值,那么leader就会把follower从ISR中删除。假设设置replica.lag.max.messages=4,那么如果producer一次传送至broker的消息数量都小于4条时,因为在leader接受到producer发送的消息之后而follower副本开始拉取这些消息之前,follower落后leader的消息数不会超过4条消息,故此没有follower移出ISR,所以这时候replica.lag.max.message的设置似乎是合理的。但是producer发起瞬时高峰流量,producer一次发送的消息超过4条时,也就是超过replica.lag.max.messages,此时follower都会被认为是与leader副本不同步了,从而被踢出了ISR。但实际上这些follower都是存活状态的且没有性能问题。那么在之后追上leader,并被重新加入了ISR。于是就会出现它们不断地剔出ISR然后重新回归ISR,这无疑增加了无谓的性能损耗。而且这个参数是broker全局的。设置太大了,影响真正”落后”follower的移除;设置的太小了,导致follower的频繁进出。无法给定一个合适的replica.lag.max.messages的值,故此,新版本的Kafka移除了这个参数。
HW和LEO
上面有简单说到HW是HighWatermark的缩写,是指consumer能够看到的此partition的位置;而LEO是LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。也就是,我们取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。消费者能消费的数据 = [LW,HW)。
每个replica都有自己的HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broker的读取请求,没有HW的限制。
由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。
事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:** /brokers/topics/[topic]/partitions/[partition]/state**
目前有两个地方会对这个Zookeeper的节点进行维护:
- Controller来维护: Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。
- leader来维护: leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。
HA相关ZooKeeper结构
admin
- 该目录下znode只有在有相关操作时才会存在,操作结束时会将其删除
- /admin/reassign_partitions用于将一些Partition分配到不同的broker集合上。对于每个待重新分配的Partition,Kafka会在该znode上存储其所有的Replica和相应的Broker id。该znode由管理进程创建并且一旦重新分配成功它将会被自动移除。
HA broker
- 即/brokers/ids/[brokerId])存储”活着”的broker信息。
- topic注册信息(/brokers/topics/[topic]),存储该topic的所有partition的所有replica所在的broker id,第一个replica即为preferred replica,对一个给定的partition,它在同一个broker上最多只有一个replica,因此broker id可作为replica id。
controller
- /controller -> int (broker id of the controller)存储当前controller的信息
- /controller_epoch -> int (epoch)直接以整数形式存储controller epoch,而非像其它znode一样以JSON字符串形式存储。
producer发布消息
写入方式
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 partition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
- 指定了 partition,则直接使用;
- 未指定 partition 但指定 key,通过对 key 的 value 进行hash 选出一个
- partition 和 key 都未指定,使用轮询选出一个 partition。
写入流程
producer 写入消息序列图如下所示:
流程说明:
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader。
- producer 将消息发送给该 leader。
- leader 将消息写入本地 log。
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK。
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK。
broker保存消息
存储方式
物理上把 topic 分成一个或多个 partition(对应 server.properties 中的 num.partitions=3 配置),每个 partition 物理上对应一个文件夹(该文件夹存储该 partition 的所有消息和索引文件),如下:
存储策略
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
基于时间: log.retention.hours=168
基于大小: log.retention.bytes=1073741824
Topic的创建和删除
创建topic
创建 topic 的序列图如下所示:
流程说明:
- controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
- controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
- 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
- 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
- controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest
删除topic
删除 topic 的序列图如下所示:
流程说明:
- controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
- 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
broker failover
kafka broker failover 序列图如下所示:
流程说明:
- controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
- controller 从 /brokers/ids 节点读取可用broker
- controller决定set_p,该集合包含宕机 broker 上的所有 partition
- 对 set_p 中的每一个 partition
- 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
- 决定新 leader
- 将新 leader,ISR,controller_epoch 和 leader_epoch 等信息写入 state 节点
- 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
controller failover
当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。
当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:
- 读取并增加 Controller Epoch。
- 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
- 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
- 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
- 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
- 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
- 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
- 启动 replicaStateMachine 和 partitionStateMachine。
- 将 brokerState 状态设置为 RunningAsController。
- 将每个 partition 的 Leadership 信息发送给所有”活”着的 broker。
- 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
- 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
Kafka在zookeeper中存储结构图
topic注册信息
- /brokers/topics/[topicName] :
- 存储某个topic的partitions所有分配信息。
我们输入zkCli.sh进入zookeeper客户端。
使用: get /brokers/topics/topic-test,可以看到某个topic的存储信息。
partition状态信息
- /brokers/topics/[topicName]/partitions/[0…N] 其中[0..N]表示partition索引号
- /brokers/topics/[topicName]/partitions/[partitionId]/state
“controller_epoch”: 表示kafka集群中的中央控制器选举次数,
“leader”: 表示该partition选举leader的brokerId,
“version”: 版本编号默认为1,
“leader_epoch”: 该partition leader选举次数,
“isr”: [同步副本组brokerId列表]
Broker注册信息
- /brokers/ids/[0…N]
- 每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)
“jmx_port”: jmx端口号,
“timestamp”: kafka broker初始启动时的时间戳,
“host”: 主机名或ip地址,
“version”: 版本编号默认为1,
“port”: kafka broker的服务端端口号,由server.properties中参数port确定
Controller epoch
- /controller_epoch —> int (epoch)
- 此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;
Controller注册信息
- /controller -> int (broker id of the controller) 存储center controller中央控制器所在kafka broker的信息
“version”: 版本编号默认为1,
“brokerid”: kafka集群中broker唯一编号,
“timestamp”: kafka broker中央控制器变更时的时间戳