Kafka手册
- 14. Kafk手册
- 14.1. 初级
- 14.2. 中级
- 14.2.1. Kafka 的设计架构?
- 14.2.2. 零拷贝技术
- 14.2.3. 高性能高吞吐
- 14.2.4. Kafka 分区的目的?
- 14.2.5. 说一下什么是副本?
- 14.2.6. Kafka消息是采用Pull模式,还是Push模式?
- 14.2.7. kafka消息队列优点或者对比
- 14.2.8. Kafka的设计是什么样的呢?
- 14.2.9. Kafka判断一个节点是否还活着有那两个条件?
- 14.2.10. Kafa consumer是否可以消费指定分区消息?
- 14.2.11. producer是否直接将数据发送到broker的leader(主节点)?
- 14.2.12. Kafka存储在硬盘上的消息格式是什么?
- 14.2.13. Kafka高效文件存储设计特点:
- 14.2.14. Kafka工作流程
- 14.2.15. 生产者向 Kafka 发送消息的执行流程介绍一下?
- 14.2.16. Producer 发送的一条 message 中包含哪些信息?
- 14.2.17. kafka 如何实现多线程的消费?
- 14.2.18. 如何保证Kafka的消息有序
- 14.2.19. kafka 如何保证数据的不重复和不丢失?
- 14.2.20. kafka如何保证对应类型数据写入相同的分区
- 14.2.21. Kafka创建Topic时如何将分区放置到不同的Broker中
- 14.2.22. Kafka新建的分区会在哪个目录下创建
- 14.2.23. partition的数据如何保存到硬盘
- 14.2.24. Kafka的消费者如何消费数据
- 14.2.25. kafaka生产数据时数据的分组策略
- 14.2.26. kafka集群架构
- 14.2.27. Kafka的工作机制
- 14.2.28. kafka文件存储结构
- 14.2.29. kafka应用场景
- 14.2.30. kafka生产者写入数据
- 14.2.31. kafka写入数据可靠性保障
- 14.2.32. kafka的ack机制
- 14.2.33. kafka 事务了解吗?
- 14.2.34. kafka有那些分区算法
- 14.2.35. kafka消费者
- 14.2.36. Rebalance (重平衡)
- 14.2.37. 日志索引
- 14.2.38. 解释如何减少ISR中的扰动?broker什么时候离开ISR?
- 14.2.39. ISR、OSR、AR 是什么?
- 14.2.40. LEO、HW、LSO、LW等分别代表什么?
- 14.2.41. 如何进行 Leader 副本选举?
- 14.2.42. 如何进行 broker Leader 选举?
- 14.2.43. Kafka为什么需要复制?
- 14.2.44. Kafka 的高可靠性是怎么实现的?
- 14.2.45. Kafka 分区数可以增加或减少吗?为什么?
- 14.2.46. Kafka消息可靠性的保证
- 14.2.47. 为什么kafka中1个partition只能被同组的一个consumer消费?
- 14.2.48. kafka和zookeeper的关系
- 14.2.49. zookeeper在kafka中的作用
- 14.2.50. Kafka服务器能接收到的最大信息是多少?
- 14.2.51. Kafka中的ZooKeeper是什么?Kafka是否可以脱离ZooKeeper独立运行?
- 14.2.52. Kafka的高性能的原因
- 14.2.53. kafka broker 挂了怎么办
- 14.2.54. 关于kafka的isr机制
- 14.2.55. Exactly Once语义
14. Kafk手册
14.1. 初级
14.1.1. 请说明什么是Apache Kafka?
Kafka 是由 Linkedin
公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
14.1.2. Kafka的基本术语
消息:Kafka 中的数据单元被称为消息
,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:为了提高效率, 消息会分批次
写入 Kafka,批次就代指的是一组消息。
主题:消息的种类称为 主题
(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
分区:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序
offset:偏移量,kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafkaOffset
Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
Producer:消息生产者,向kafka broker发消息的客户端
Consumer:消息消费者,向kafka broker取消息的客户端
Group消费者组:这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic
生产者: 向主题发布消息的客户端应用程序称为生产者
(Producer),生产者用于持续不断的向某个主题发送消息。
消费者:订阅主题消息的客户端程序称为消费者
(Consumer),消费者用于处理生产者产生的消息。
消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组
(Consumer Group)指的就是由一个或多个消费者组成的群体。
偏移量:偏移量
(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
broker: 一个独立的 Kafka 服务器就被称为 broker
,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker 集群:broker 是集群
的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器
的角色(自动从集群的活跃成员中选举出来)。
副本:Kafka 中消息的备份又叫做 副本
(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。或者说某一个消费者加入一个消费者组,那么就需要重新平衡消息的发送,Rebalance 是 Kafka 消费者端实现高可用的重要手段。
14.1.3. 请说明什么是传统的消息传递方法?
传统的消息传递方法包括两种:
- 队列:在队列中,一组用户可以从服务器中读取消息,每条消息都发送给其中一个人,某个消费者消费数据之后,消息会随机删除。
- 发布-订阅:在这个模型中,消息被广播给所有的用户,可以有多个生产者和多个消费者。
14.1.4. 数据传输的事务有几种?
数据传输的事务定义通常有以下三种级别:
- 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输,这种情况会发生数据的丢失。
- 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
- 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的,端到端的精确一次性最难保证。
14.1.5. 使用消息队列的好处
- 解耦
- 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 可恢复性
- 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 缓冲
- 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 灵活性 &峰值处理能力
- 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
- 异步通信
- 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
14.1.6. 为什么选择kafka
14.1.6.1. kafka由多个生产者
KafKa 可以无缝地支持多个生产者,不管客户端使用一个主题,还是多个主题。Kafka 适合从多个前端系统收集数据,并以统一的格式对外提供数据。
14.1.6.2. 多个消费者
Kafka 支持多个消费者从一个单独的消息流中读取数据,并且消费者之间互不影响。这与其他队列系统不同,其他队列系统一旦被客户端读取,其他客户端就不能 再读取它。并且多个消费者可以组成一个消费者组,他们共享一个消息流,并保证消费者组对每个给定的消息只消费一次。
14.1.6.3. 基于磁盘的消息存储
Kafka 允许消费者非实时地读取消息,原因在于 Kafka 将消息提交到磁盘上,设置了保留规则进行保存,无需担心消息丢失等问题。
14.1.6.4. 伸缩性
可扩展多台 broker。用户可以先使用单个 broker,到后面可以扩展到多个 broker。
14.1.6.5. 高性能
Kafka 可以轻松处理百万千万级消息流,同时还能保证 亚秒级 的消息延迟。
14.1.7. Kafka生产者分区策略
- 指明 partition 的情况下,直接将指明的值直接作为partiton值。
- 没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值。
- 既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
14.1.8. Kafka 缺点?
- 由于是批量发送,数据并非真正的实时
- 对于mqtt协议不支持
- 不支持物联网传感数据直接接入
- 仅支持统一分区内消息有序,无法实现全局消息有序
- 监控不完善,需要安装插件
- 依赖zookeeper进行元数据管理。
14.1.9. Kafka消息队列
Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式
14.1.9.1. 点对点模式
Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式.
点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。
- 消费者去队列中拉去消息,即使有多个消费者,那生产者也会把消息发送到队列中,多个消费者轮询到队列中拉去消息。
14.1.9.2. 发布订阅模式
发布订阅模式有两种形式,一种是消费者主动拉取数据的形式,另一种是生产者推送消息的形式,kafka是基于发布订阅模式中消费者拉取的方式。消费者的消费速度可以由消费者自己决定。但是这种方式也有缺点,当没有消息的时候,kafka的消费者还需要不停的访问kafka生产者拉取消息,浪费资源。
- 这种方式中,消费者和生产者都可以有多个。
14.1.10. 请简述下你在哪些场景下会选择 Kafka?
日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等
活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
流式处理:比如spark streaming和 Flink
14.1.11. 如何保证每个应用程序都可以获取到 Kafka 主题中的所有消息,而不是部分消息?
为每个应用程序创建一个消费者组,然后往组中添加消费者来伸缩读取能力和处理能力,每个群组消费主题中的消息时,互不干扰。
本质上是实现广播的功能。
14.2. 中级
14.2.1. Kafka 的设计架构?
简单的设计架构
Kafak 总体架构图中包含多个概念:
(1)ZooKeeper:Zookeeper
负责保存 broker
集群元数据,并对控制器进行选举等操作。
(2)Producer:生产者负责创建消息,将消息发送到 Broker。
(3)Broker: 一个独立的 Kafka
服务器被称作 broker
,broker 负责接收来自生产者的消息,为消息设置偏移量,并将消息存储在磁盘。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
(4)Consumer:消费者负责从 Broker
订阅并消费消息。
(5)Consumer Group:Consumer Group
为消费者组,一个消费者组可以包含一个或多个 Consumer
。
使用 多分区 + 多消费者 方式可以极大 提高数据下游的处理速度,同一消费者组中的消费者不会重复消费消息
,同样的,不同消费组中的消费者消费消息时互不影响。Kafka 就是通过消费者组的方式来实现消息 P2P 模式和广播模式。
(6)Topic:Kafka 中的消息 以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
(7)Partition:一个 Topic 可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的 日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的 偏移量(offset)。
(8)Offset:offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka保证的是分区有序性而不是主题有序性。
(9)Replication:副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络异常,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。
(10)Record:实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了key
、value
和 timestamp
。
(11)Leader: 每个分区多个副本的 "主" leader,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
(12)follower: 每个分区多个副本中的"从" follower,实时从 Leader 中同步数据,保持和 leader 数据的同步。Leader 发生故障时,某个 follow 会成为新的 leader。
14.2.2. 零拷贝技术
从 Kafka 里经常要消费数据,那么消费的时候实际上就是要从 kafka 的磁盘文件里读取某条数据然后发送给下游的消费者,如下图所示。
那么这里如果频繁的从磁盘读数据然后发给消费者,会增加两次没必要的拷贝,如下图:
一次是从操作系统的 cache 里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝回操作系统的 Socket 缓存里。
而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。所以这种方式来读取数据是比较消耗性能的。
Kafka 为了解决这个问题,在读数据的时候是引入零拷贝技术。
也就是说,直接让操作系统的 cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存,如下图所示:
通过 零拷贝技术,就不需要把 os cache 里的数据拷贝到应用缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝。
对 Socket 缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从 os cache 中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。
Kafka 从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是直接读内存的。
kafka 集群经过良好的调优,数据直接写入 os cache 中,然后读数据的时候也是从 os cache 中读。相当于 Kafka 完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。
传统IO流程:
第一次:将磁盘文件,读取到操作系统内核缓冲区。
第二次:将内核缓冲区的数据,copy到application应用程序的buffer。
第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区)
第四次:将socket buffer的数据,copy到网卡,由网卡进行网络传输。
传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的。实际IO读写,需要进行IO中断,需要CPU响应中断(带来上下文切换),尽管后来引入DMA来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。
重新思考传统IO方式,会注意到实际上并不需要第二个和第三个数据副本。应用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。
显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销,这也正是零拷贝出现的意义。
所以零拷贝是指读取磁盘文件后,不需要做其他处理,直接用网络发送出去。
实现过程:
简单的来说,比如用户读取一个文件,那么首先文件会读取到内存的缓冲区,然后在从缓冲区通过网络将数据发送给用户。可以抽象为下面两部:
read(file, tmp_buf, len);
write(socket, tmp_buf, len);
但是实际上,中间经历了四个过程,因为读取文件需要在用户态和和心态之间相互转换:
- 程序调用 read 产生一次用户态到内核态的上下文切换。DMA 模块从磁盘读取文件内容,将其拷贝到内核空间的缓冲区,完成第 1 次拷贝。
- 数据从内核缓冲区拷贝到用户空间缓冲区,之后系统调用 read 返回,这回导致从内核空间到用户空间的上下文切换。这个时候数据存储在用户空间的 tmp_buf 缓冲区内,可以后续的操作了。
- 程序调用 write 产生一次用户态到内核态的上下文切换。数据从用户空间缓冲区被拷贝到内核空间缓冲区,完成第 3 次拷贝。但是这次数据存储在一个和 socket 相关的缓冲区中,而不是第一步的缓冲区。
- write 调用返回,产生第 4 个上下文切换。第 4 次拷贝在 DMA 模块将数据从内核空间缓冲区传递至协议引擎的时候发生,这与我们的代码的执行是独立且异步发生的。你可能会疑惑:“为何要说是独立、异步?难道不是在 write 系统调用返回前数据已经被传送了?write 系统调用的返回,并不意味着传输成功——它甚至无法保证传输的开始。调用的返回,只是表明以太网驱动程序在其传输队列中有空位,并已经接受我们的数据用于传输。可能有众多的数据排在我们的数据之前。除非驱动程序或硬件采用优先级队列的方法,各组数据是依照FIFO的次序被传输的(上图中叉状的 DMA copy 表明这最后一次拷贝可以被延后)。
Mmap
上面的数据拷贝非常多,我们可以减少一些重复拷贝来减少开销,提升性能。某些硬件支持完全绕开内存,将数据直接传送给其他设备。这个特性消除了系统内存中的数据副本,因此是一种很好的选择,但并不是所有的硬件都支持。此外,来自于硬盘的数据必须重新打包(地址连续)才能用于网络传输,这也引入了某些复杂性。为了减少开销,我们可以从消除内核缓冲区与用户缓冲区之间的拷贝开始。
减少数据拷贝的一种方法是将 read 调用改为 mmap。例如:
tmp_buf = mmap(file, len);
write(socket, tmp_buf, len);
mmap 调用导致文件内容通过 DMA 模块拷贝到内核缓冲区。然后与用户进程共享缓冲区,这样不会在内核缓冲区和用户空间之间产生任何拷贝。
write 调用导致内核将数据从原始内核缓冲区拷贝到与 socket 关联的内核缓冲区中。
第 3 次数据拷贝发生在 DMA 模块将数据从 socket 缓冲区传递给协议引擎时。
14.2.3. 高性能高吞吐
14.2.3.1. 页缓存技术
Kafka
是基于 操作系统 的页缓存
来实现文件写入的。
操作系统本身有一层缓存,叫做 page cache,是在 内存里的缓存,我们也可以称之为 os cache,意思就是操作系统自己管理的缓存。
Kafka 在写入磁盘文件的时候,可以直接写入这个 os cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入磁盘文件中。通过这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘,原理图如下:
14.2.3.2. 顺序写
另一个主要功能是 kafka 写数据的时候,是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据
。
普通的机械磁盘如果你要是随机写的话,确实性能极差,也就是随便找到文件的某个位置来写数据。
但是如果你是 追加文件末尾 按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能相差无几。
基于上面两点,kafka 就实现了写入数据的超高性能。
14.2.3.3. 批发送
批处理是一种常用的用于提高I/O性能的方式. 对Kafka而言, 批处理既减少了网络传输的Overhead, 又提高了写磁盘的效率. Kafka 0.82 之后是将多个消息合并之后再发送, 而并不是send一条就立马发送(之前支持)
# 批量发送的基本单位, 默认是16384Bytes, 即16kB
batch.size
# 延迟时间
linger.ms
# 两者满足其一便发送
14.2.3.4. 数据压缩
数据压缩的一个基本原理是, 重复数据越多压缩效果越好. 因此将整个Batch的数据一起压缩能更大幅度减小数据量, 从而更大程度提高网络传输效率。
Broker接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘, Consumer接受到压缩后的数据再解压缩,
整体来讲: Producer 到 Broker, 副本复制, Broker 到 Consumer 的数据都是压缩后的数据, 保证高效率的传输
14.2.3.5. 零拷贝技术
14.2.4. Kafka 分区的目的?
分区对于Kafka 集群的好处是:实现负载均衡。
分区对于消费者来说,可以提高并发度,提高效率,当然,对于是生产者也可以提高并行度的目的,将数据写入多个分区,提高写入速度,各个分区都是独立的,不会产生并发安全问题。
如果我们假设像标准 MQ 的 Queue, 为了保证一个消息只会被一个消费者消费, 那么我们第一想到的就是加锁. 对于发送者, 在多线程并且非顺序写环境下, 保证数据一致性, 我们同样也要加锁. 一旦考虑到加锁, 就会极大的影响性能.
我们再来看Kafka 的 Partition, Kafka 的消费模式和发送模式都是以 Partition 为分界. 也就是说对于一个 Topic 的并发量限制在于有多少个 Partition, 就能支撑多少的并发. 可以参考 Java 1.7 的 ConcurrentHashMap 的桶设计, 原理一样, 有多少桶, 支持多少的并发
14.2.5. 说一下什么是副本?
kafka 为了保证数据不丢失,从 0.8.0
版本开始引入了分区副本机制。在创建 topic 的时候指定 replication-factor
,默认副本为 3 。
副本是相对 partition 而言的,一个分区中包含一个或多个副本,其中一个为leader
副本,其余为follower
副本,各个副本位于不同的 broker
节点中。
所有的读写操作
都是经过 Leader 进行的,同时 follower 会定期地去 leader 上复制数据。当 Leader 挂掉之后,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。
Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。
14.2.6. Kafka消息是采用Pull模式,还是Push模式?
Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息。
一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。最终Kafka还是选取了传统的pull模式
Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式下,consumer就可以根据自己的消费能力去决定这些策略
Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到t达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发。
14.2.7. kafka消息队列优点或者对比
14.2.7.1. 请说明Kafka相对于传统的消息传递方法有什么优势?
- 高性能:单一的Kafka代理可以处理成千上万的客户端,每秒处理数兆字节的读写操作,Kafka性能远超过传统的ActiveMQ、RabbitMQ等,而且Kafka支持Batch操作
- 可扩展:Kafka集群可以透明的扩展,增加新的服务器进集群
- 容错性: Kafka每个Partition数据会复制到几台服务器,当某个Broker失效时,Zookeeper将通知生产
者和消费者从而使用其他的Broker。 - 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
- 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
- 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
14.2.7.2. Kafka与传统消息队列的区别?
在说区别的时候,我们先来看看kafka的应用场景:
kafka是个日志处理缓冲组件,在大数据信息处理中使用。和传统的消息队列相比较简化了队列结构和功能,以流形式处理存储(持久化)消息(主要是日志)。日志数据量巨大,处理组件一般会处理不过来,所以作为缓冲曾的kafka,支持巨大吞吐量。为了防止信息丢失,其消息被消费后不直接丢弃,要多存储一段时间,等过期时间过了才丢弃。这是mq和redis不能具备的。
主要特点入下:
巨型存储量:
- 支持TB甚至PB级别数据。
高吞吐,高IO:
- 一般配置的服务器能实现单机每秒100K条以上消息的传输。
消息分区,分布式消费:
- 首先kafka会将接收到的消息分区(partition),每个主题(topic)的消息有不同的分区,这样一方面消息的存储就不会受到单一服务器存储空间大小的限制,另一方面消息的处理也可以在多个服务器上并行。也做到了负载均衡的目的,将数据均衡到堕胎服务器的多个分区中。
- 能保消息顺序传输。 支持离线数据处理和实时数据处理。
Scale out:
- 支持在线水平扩展,以支持更大数据处理量。
高可用机制:
- 其次为了保证高可用,每个分区都会有一定数量的副本(replica)。这样如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。
- 然后保证分区内部消息的消费有序性。
消费者组:
- Kafka还具有consumer group的概念,每个分区只能被同一个group的一个consumer消费,但可以被多个group消费。
而传统的消息队列,比如Rides
redis只是提供一个高性能的、原子操作内存键值队,具有高速访问能力,可用做消息队列的存储,但是不具备消息队列的任何功能和逻辑,要作做为消息队列来实现的话,功能和逻辑要通过上层应用自己实现。
redis 消息推送(基于分布式 pub/sub)多用于实时性较高的消息推送,并不保证可靠。
作为消息队列来说,企业中选择mq的还是多数,因为像Rabbit,Rocket等mq中间件都属于很成熟的产品,性能一般但可靠性较强,而kafka原本设计的初衷是日志统计分析,现在基于大数据的背景下也可以做运营数据的分析统计,而redis的主要场景是内存数据库,作为消息队列来说可靠性太差,而且速度太依赖网络IO,在服务器本机上的速度较快,且容易出现数据堆积的问题,在比较轻量的场合下能够适用
还由MQ
我们以是RabbitMQ为例介绍。它是用Erlang语言开发的开源的消息队列,支持多种协议,包括AMQP,XMPP, SMTP, STOMP。适合于企业级的开发。
MQ支持Broker构架,消息发送给客户端时需要在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
14.2.8. Kafka的设计是什么样的呢?
Kafka将消息以topic为单位进行归纳
将向Kafka topic发布消息的程序成为producers.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息
将预订topics并消费消息的程序成为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
14.2.9. Kafka判断一个节点是否还活着有那两个条件?
节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接
如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久,注意,这里的同步,同步的是isr中的follower节点。
14.2.10. Kafa consumer是否可以消费指定分区消息?
Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。
14.2.11. producer是否直接将数据发送到broker的leader(主节点)?
producer直接将数据发送到topic的leader(主节点),不需要在多个节点进行分发,为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了。
14.2.12. Kafka存储在硬盘上的消息格式是什么?
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。
- 消息长度: 4 bytes (value: 1+4+n)
- 版本号: 1 byte
- CRC校验码: 4 bytes
- 具体的消息: n bytes
14.2.13. Kafka高效文件存储设计特点:
- Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用,健儿来说就是通过一种分而治之的方法,逐步清除没有用的小文件。
- 通过索引信息可以快速定位message和确定response的最大大小。
- 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
- 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
14.2.14. Kafka工作流程
- 消息分类按不同类别,分成不同的Topic,Topic⼜又拆分成多个partition,每个partition均衡分散到不同的服务器器(提高并发访问的能力)
- 消费者按顺序从partition中读取,不支持随机读取数据,但可通过改变保存到zookeeper中的offset位置实现从任意位置开始读取。
- 服务器消息定时清除(不管有没有消费)
- 每个partition还可以设置备份到其他服务器上的个数以保证数据的可⽤性。通过Leader,Follower方式。
- zookeeper保存kafka服务器和客户端的所有状态信息.(确保实际的客户端和服务器轻量级)
- 在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,⼀个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过⼀个consumer可以消费多个partitions中的消息。
- 如果所有的consumer都具有相同的group(也就是所有的消费者再同一个消费者组),这种情况和queue模式很像;消息将会在consumers之间负载均衡.
- 如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
- 持久性,当收到的消息时先buffer起来,等到了一定的阀值再写入磁盘文件,减少磁盘IO.在一定程度上依赖OS的文件系统(对文件系统本身优化几乎不可能),所以在这个过程中,如果断电的话,存在消息的丢失。
- 除了磁盘IO,还应考虑网络IO,批量对消息发送和接收,并对消息进行压缩。
- 在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这种模式有些优点,首先consumer端可以根据⾃己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.
- kafka无需记录消息是否接收成功,是否要重新发送等,所以kafka的producer是非常轻量级的,consumer端也只需要将fetch后的offset位置注册到zookeeper,所以也是非常轻量级的.
14.2.15. 生产者向 Kafka 发送消息的执行流程介绍一下?
- 生产者要往 Kafka 发送消息时,需要创建 ProducerRecoder,代码如下:
ProducerRecord<String,String> record
= new ProducerRecoder<>("CostomerCountry","Precision Products","France");
try{
producer.send(record);
}catch(Exception e){
e.printStackTrace();
}
ProducerRecoder 对象会包含目标 topic,分区内容,以及指定的 key 和 value,
在发送 ProducerRecoder 时,生产者会先把键和值对象序列化成字节数组
,然后在网络上传输。生产者在将
消息
发送到某个 Topic ,需要经过拦截器、序列化器和分区器(Partitioner)。如果消息 ProducerRecord 没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
- 若没有指定分区,且消息的 key 不为空,则使用 murmur 的 Hash 算法(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区分配。
- 若没有指定分区,且消息的 key 也是空,则用轮询的方式选择一个分区。
分区选择好之后,会将消息添加到一个记录批次中,这个批次的所有消息都会被发送到相同的 Topic 和 partition 上。然后会有一个独立的线程负责把这些记录批次发送到相应的 broker 中。
broker 接收到 Msg 后,会作出一个响应。如果成功写入 Kafka 中,就返回一个RecordMetaData 对象,它包含
Topic
和Partition
信息,以及记录在分区的offset
。若写入失败,就返回一个错误异常,生产者在收到错误之后尝试重新发送消息,几次之后如果还失败,就返回错误信息。
14.2.16. Producer 发送的一条 message 中包含哪些信息?
消息由 可变长度
的 报头、可变长度
的 不透明密钥字节数组和 可变长度
的 不透明值字节数组 组成。
RecordBatch 是 Kafka 数据的存储单元,一个 RecordBatch 中包含多个 Record
(即我们通常说的一条消息)。
一个 RecordBatch 中可以包含多条消息,即上图中的 Record,而每条消息又可以包含多个 Header 信息,Header 是 Key-Value 形式的。
14.2.17. kafka 如何实现多线程的消费?
kafka 允许同组的多个 partition 被一个 consumer 消费,但不允许一个 partition 被同组的多个 consumer 消费。
实现多线程步骤如下:
- 生产者随机分区提交数据(自定义随机分区)。
- 消费者修改单线程模式为多线程,在消费方面得注意,得遍历所有分区,否则还是只消费了一个区。
14.2.18. 如何保证Kafka的消息有序
Kafka只能保证一个partition中的消息被某个consumer消费时是顺序的,事实上,从Topic角度来说,
当有多个partition时,消息仍然不是全局有序的。
生产者:通过分区的 leader 副本负责数据以先进先出的顺序写入,来保证消息顺序性。
在这里也需要说一下,如果是一个消费者组中的多个消费者去消费一个分区的话,就不能保证消费数据时候的消费顺序,并且会产生并发安全问题。
消费者:同一个分区内的消息只能被一个 group 里的一个消费者消费,保证分区内消费有序。
kafka 每个 partition 中的消息在写入时都是有序的,消费时, 每个 partition 只能被每一个消费者组中的一个消费者消费,保证了消费时也是有序的。
整个 kafka 不保证有序。如果为了保证 kafka 全局有序,那么设置一个生产者,一个分区,一个消费者。
14.2.19. kafka 如何保证数据的不重复和不丢失?
- exactly once 模式 精确传递一次。将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。
kafka 默认的模式是 at least once ,但这种模式可能会产生重复消费的问题,所以在业务逻辑必须做幂等设计。
使用 exactly Once + 幂等操作,可以保证数据不重复,不丢失。
14.2.20. kafka如何保证对应类型数据写入相同的分区
通过 消息键 和 分区器 来实现,分区器为键
生成一个 offset
,然后使用 offset 对主题分区进行取模,为消息选取分区,这样就可以保证包含同一个键的消息会被写到同一个分区上。
- 如果
ProducerRecord
没有指定分区,且消息的key 不为空
,则使用Hash 算法
(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区分配。 - 如果 ProducerRecord 没有指定分区,且消息的
key 也是空
,则用 轮询 的方式选择一个分区。
14.2.21. Kafka创建Topic时如何将分区放置到不同的Broker中
- 副本因子不能大于 Broker 的个数;
- 第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
- 其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
- 剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的。
14.2.22. Kafka新建的分区会在哪个目录下创建
在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。
当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。
如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。
但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?
答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。
14.2.23. partition的数据如何保存到硬盘
topic中的多个partition以文件夹的形式保存到broker(也就是每一个kafka的服务器上),每个分区序号从0递增,且消息有序,Partition文件下有多个segment(xxx.index,xxx.log),segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为1g,如果大小大于1g时,会滚动一个新的segment并且以上一个segment最后一条消息的偏移量命名。
14.2.24. Kafka的消费者如何消费数据
消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置,等到下次消费时,他会接着上次位置继续消费。
这个offset的位置是由zookeeper来维护的。
14.2.25. kafaka生产数据时数据的分组策略
生产者决定数据产生到集群的哪个partition中
每一条消息都是以(key,value)格式
Key是由生产者发送数据传入
所以生产者(key)决定了数据产生到集群的哪个partition
14.2.26. kafka集群架构
14.2.27. Kafka的工作机制
上图表示kafka
集群有一个topic A
,并且有三个分区,分布在三个节点上面。
注意点:每个分区有两个副本,两个副本分别是leader,follower
,并且每一个副本一定不和自己的leader
分布在一个节点上面。Kafka
中消息是以 **topic
**进行分类的,生产者生产消息,消费者消费消息,都是面向 topic
的。
topic
是逻辑上的概念,而partition
是物理上的概念,每个 partition
对应于一个 log
文件,该 log
文件中存储的就是 producer
生产的数据。Producer
生产的数据会被不断追加到该log
文件末端,且每条数据都有自己的 offset
。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset
,以便出错恢复时,从上次的位置继续消费。每一个分区内部的数据是有序的,但是全局不是有序的。
14.2.28. kafka文件存储结构
由于生产者生产的消息会不断追加到log
文件末尾,为防止 log
文件过大导致数据定位效率低下,Kafka
采取了分片和索引机制,将每个 partition
分为多个 segment
。每个 segment
对应两个文件“.index”
文件和“.log”
文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号
。例如,first
这个 topic
有三个分区,则其对应的文件夹为 :
first-0,first-1,first-2
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
//index文件中存储的是每一个log文件的起始数据的偏移量
index和 log
文件以当前 segment
的第一条消息的 offset
命名。下图为 index
文件和 log
文件的结构示意图
上面kafka
再查找偏移量的时候是以二分查找法进行查找的。也就是查询index的时候使用的是二分查找法。
查找原理是:文件头的偏移量和文件大小快速定位。“.index”
文件存储大量的索引信息,在查找index
的时候使用的是二分查找法,“.log”
文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message
的物理偏移地址。
log、index、timeindex 中存储的都是
二进制
的数据( log 中存储的是 BatchRecords 消息内容,而 index 和 timeindex 分别是一些索引信息。)
14.2.29. kafka应用场景
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势,
kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(⽐如,消息重发,消息发送丢失等)
kafka的特性决定它非常适合作为"日志收集中心";
application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;
kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.
14.2.30. kafka生产者写入数据
14.2.30.1. 副本
同一个partition
可能会有多个replication
(对应server.properties
配置中default.replication.factor=N)没有replication
的情况下,一旦broker
宕机,其上所有 patition
的数据都不可被消费,同时producer也不能再将数据存于其上的patition
。引入replication
之后,同一个partition
可能会有多个replication
,而这时需要在这些replication
之间选出一个leader
,producer
和consumer
只与这个leader
交互,其它replication
的follower
从leader 中复制数据,保证数据的一致性。
14.2.30.2. 写入方式
producer
采用推(push)
模式将消息发布到broker
,每条消息都被追加(append)
到分区(patition)
中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka
吞吐率)。
- producer先从
zookeeper
的"/brokers/.../state"
节点找到该partition
的leader
producer
将消息发送给该leader
leader
将消息写入本地log
followers
从leader pull
消息,写入本地log
后向leader
发送ACK
leader
收到所有ISR
中的replication
的ACK
后,增加HW(high watermark,最后commit 的offset)
并向producer
发送ACK
14.2.30.3. broker保存消息
存储方式:物理上把topic
分成一个或多个patition
,每个patition
物理上对应一个文件夹(该文件夹存储该patition
的所有消息和索引文件)
14.2.30.4. 存储策略
无论消息是否被消费,kafka
都会保留所有消息。有两种策略可以删除旧数据:
- 基于时间:
log.retention.hours=168
- 基于大小:
log.retention.bytes=1073741824
需要注意的是,因为Kafka
读取特定消息的时间复杂度为O(1)
,即与文件大小无关,所以这里删除过期文件与提高Kafka
性能无关
14.2.30.5. 分区
消息发送时都被发送到一个topic
,其本质就是一个目录,而topic
是由一些Partition Logs
(分区日志)组成,其组织结构如下图所示:
我们可以看到,每个Partition
中的消息都是有序的,生产的消息被不断追加到Partition log
上,其中的每一个消息都被赋予了一个唯一的offset
值。
- 分区的原因
- 方便在集群中扩展,每个
Partition
可以通过调整以适应它所在的机器,而一个topic
又可以有多个Partition
组成,因此整个集群就可以适应任意大小的数据了; - 可以提高并发,因为可以以
Partition
为单位读写了。
- 方便在集群中扩展,每个
- 分区的原则
- 指定了
patition
,则直接使用; - 未指定
patition
但指定key
,通过对key
的value
进行hash
出一个patition
; - 既没有
partition
值又没有key
值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic
可用的partition
总数取余得到partition
值,也就是常说的round-robin
算法(轮询算法)。.
- 指定了
14.2.31. kafka写入数据可靠性保障
produce写入消息的可靠性保证
数据写入的可靠性保证。
为保证 producer
发送的数据,能可靠的发送到指定的 topic
,topic
的每个 partition
收到producer
发送的数据后,都需要向 producer
发送 ack(acknowledgement确认收到)
,如果producer
收到 ack
,就会进行下一轮的发送,否则重新发送数据。
- 副本数据同步策略
方案 | 优点 | 缺点 |
---|---|---|
半数以上同步完成,发送ack 确认 | 延迟低 | 选举新的节点时,容忍n 个节点故障,需要2n+1 个副本 |
全部同步完成以后,才发送ack 确认 | 选举新的leader 时,容忍n 台节点故障,需要n+1 个副本 | 延迟低 |
Kafka
选择了第二种方案,原因如下:
- 同样为了容忍
n
台节点的故障,第一种方案需要2n+1
个副本,而第二种方案只需要n+1
个副本,而Kafka
的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。 - 虽然第二种方案的网络延迟会比较高,但网络延迟对
Kafka
的影响较小。
ISR:
采用第二种方案之后,设想以下情景:leader
收到数据,所有follower
都开始同步数据,但有一个 follower
,因为某种故障,迟迟不能与 leader
进行同步,那 leader
就要一直等下去,直到它完成同步,才能发送 ack
。这个问题怎么解决呢?
Leader
维护了一个动态的in-sync replica set (ISR)
,意为和leader
保持同步的follower
集合,是一个队列。当ISR
中的follower
完成数据的同步之后,leader
就会给follower
发送ack
。如果follower
长时间未向leader
同步数据,则该follower
将被踢出ISR
,该时间阈值由replica.lag.time.max.ms
参数设定。Leader
发生故障之后,就会从ISR
中选举新的leader
。在这个时间内,就添加到isr中,否则就提出isr集合中,不在isr中的follower也不可能被选举为leader。
rerplica.lag.time.max.ms=10000
//如果leader发现flower超过10秒没有向它发起fech请求, 那么leader考虑这个flower是不是程序出了点问题,或者资源紧张调度不过来, 它太慢了, 不希望它拖慢后面的进度, 就把它从ISR中移除.
rerplica.lag.max.messages=4000
//相差4000条就移除,flower慢的时候, 保证高可用性, 同时满足这两个条件后又加入ISR中,在可用性与一致性做了动态平衡
min.insync.replicas=1
//需要保证ISR中至少有多少个replica
ack应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR
中的follower
全部接收成功。所以 Kafka
为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
ACKS
参数配置:
- 0:
producer
不等待broker
的ack
,这一操作提供了一个最低的延迟,broker
一接收到还没有写入磁盘就已经返回,当broker
故障时有可能丢失数据; - 1:
producer
等待broker
的ack
,partition
的leader
落盘成功后返回ack
,如果在follower
同步成功之前leader
故障,那么将会丢失数据;
ack=1也可能丢失数据
-1(all):
producer
等待broker
的ack
,partition
的leader
和follower
(这里指的是isr中的follower)全部落盘成功后才返回ack
。但是如果在follower
同步完成后,broker
发送ack
之前,leader
发生故障,那么会造成数据重复。=-1也可能会发生数据的丢失,发生重复数据的情况是leader接收到数据并且在follower之间已经同步完成后,但是此时leader挂掉,没有返回ack确认,此时又重新选举产生了leader,那么producer会重新发送一次数据,所以会导致数据重复。
小结
request.required.asks=0
//0:相当于异步的, 不需要leader给予回复, producer立即返回, 发送就是成功,那么发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息2.Leader与Follower数据不同步), 既有可能丢失也可能会重发
//1:当leader接收到消息之后发送ack, 丢会重发, 丢的概率很小
//-1:当所有的follower都同步消息成功后发送ack. 不会丢失消息
ack=-1数据重复案例
ack是保证生产者生产的数据不丢失,hw是保证消费者消费数据的一致性问题。hw实际就是最短木桶原则,根据这个原则消费者进行消费数据。不能解决数据重复和丢失问题。ack解决丢失和重复问题。
故障处理细节
LEO
:指的是每个副本最大的 offset
,也就是灭一个副本的最后offset值。
HW
:指的是消费者能见到的最大的 offset,ISR队列中最小的 LEO。
follower故障
follower
发生故障后会被临时踢出 ISR
,待该 follower
恢复后,follower
会读取本地磁盘记录的上次的 HW
,并将 log
文件高于 HW
的部分截取掉,从 HW
开始向 leader
进行同步。等该 **follower
**的 **LEO
**大于等于该 **Partition
**的 HW
,即 follower
追上 leader
之后,就可以重新加入 ISR
了。
leader故障
leader
发生故障之后,会从 ISR
中选出一个新的 leader
,之后,为保证多个副本之间的数据一致性,其余的 follower
会先将各自的 log
文件高于 HW
的部分截掉,然后从新的 leader
同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复,ack
确认机制可以保证数据的不丢失和不重复,LEO
和hw
可以保证数据的一致性问题
14.2.32. kafka的ack机制
request.required.acks有三个值 0 1 -1
- 0:生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据
- 1:服务端会等待ack值 leader副本确认接收到消息后发送ack但是如果leader挂掉后他不确保是否复制完成新leader也会导致数据丢失
- -1:同样在1的基础上 服务端会等所有的follower的副本受到数据后才会受到leader发出的ack,这样数据不会丢失
14.2.32.1. Exactly Once语义
将服务器的 ACK
级别设置为1
,可以保证Producer
到Server
之间不会丢失数据,即At Least Once
语义。相对的,将服务器ACK
级别设置为0
,可以保证生产者每条消息只会被发送一次,即 At Most Once
语义。At Least Once
可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most Once
可以保证数据不重复,但是不能保证数据不丢失。
但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once
语义。在 0.11
版本以前的 Kafka
,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。0.11
版本的 Kafka
,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer
不论向 Server
发送多少次重复数据,Server
端都只会持久化一条。
幂等性结合At Least Once
语义,就构成了 Kafka
的 Exactly Once
语义。即:At Least Once +幂等性= Exactly Once
要启用幂等性,只需要将 Producer
的参数中 enable.idompotence
设置为true
即可。
Kafka
的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer
在初始化的时候会被分配一个 PID
,发往同一 Partition
的消息会附带Sequence Number
。而Broker
端会对<PID, Partition, SeqNumber>
做缓存,当具有相同主键的消息提交时,Broker
只会持久化一条。但是 PID
重启就会变化,同时不同的 Partition
也具有不同主键,所以幂等性无法保证跨分区跨会话(也就是重新建立producer链接的情况)的 Exactly Once
。即只能保证单次会话不重复问题。幂等性只能解决但回话单分区的问题。
14.2.33. kafka 事务了解吗?
Kafka 在 0.11版本引入事务支持,事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
Producer 事务
为了实现跨分区跨会话事务,需要引入一个全局唯一的 Transaction ID
,并将 Producer 获取的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的 Transaction ID 获取原来的 PID。
为了管理 Transaction
,Kafka 引入了一个新的组件 Transaction Coordinator
。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic
,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
**Consumer 事务 **
上述事务机制主要是从Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其是无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
14.2.34. kafka有那些分区算法
kafka包含三种分区算法:
14.2.34.1. 轮询策略
也称 Round-robin 策略,即顺序分配。比如一个 topic 下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第四条消息时又会重新开始。
轮询策略是 kafka java 生产者 API 默认提供的分区策略。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是平时最常用的分区策略之一。
14.2.34.2. 随机策略
也称 Randomness 策略。所谓随机就是我们随意地将消息放置在任意一个分区上,如下图:
14.2.34.3. 按 key 分配策略
kafka 允许为每条消息定义消息键,简称为 key。一旦消息被定义了 key,那么你就可以保证同一个 key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,如下图所示:
14.2.35. kafka消费者
14.2.35.1. 消费方式
consumer
采用pull
(拉)模式从broker
中读取数据。push
(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer
来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull
模式则可以根据consumer
的消费能力以适当的速率消费消息。对于Kafka
而言,pull
模式更合适,它可简化broker
的设计,consumer
可自主控制消费消息的速率,同时consumer
可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义pull
模式不足之处是,如果kafka
没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka
的消费者在消费数据时会传入一个时长参数timeout
,如果当前没有数据可供消费,consumer
会等待一段时间之后再返回,这段时长即为timeout
。
14.2.35.2. Consumer Group
在 Kafka 中, 一个 Topic 是可以被一个消费组消费, 一个Topic 分发给 Consumer Group 中的Consumer 进行消费, 保证同一条 Message 不会被不同的 Consumer 消费。
注意: 当Consumer Group的 Consumer 数量大于 Partition 的数量时, 超过 Partition 的数量将会拿不到消息
14.2.35.3. 分区分配策略
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费 Kafka有三种分配策略,一是RoundRobin,一是Range。高版本还有一个StickyAssignor策略 将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:
同一个 Consumer Group 内新增消费者。
消费者离开当前所属的Consumer Group,包括shuts down或crashes。
14.2.35.3.1. Range分区分配策略
Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。假如有10个分区,3个消费者线程,把分区按照序号排列
0,1,2,3,4,5,6,7,8,9
消费者线程为
C1-0,C2-0,C2-1
那么用partition数除以消费者线程的总数来决定每个消费者线程消费几个partition,如果除不尽,前面几个消费者将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程,10/3 = 3,而且除除不尽,那么消费者线程C1-0将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
C1-0:0,1,2,3
C2-0:4,5,6
C2-1:7,8,9
如果有11个分区将会是:
C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10
假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:
C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6)
C2-1:T1(7,8,9) T2(7,8,9)
14.2.35.3.2. RoundRobinAssignor分区分配策略
RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者. 使用RoundRobin策略有两个前提条件必须满足:
同一个消费者组里面的所有消费者的num.streams(消费者消费线程数)必须相等;每个消费者订阅的主题必须相同。加入按照 hashCode 排序完的topic-partitions组依次为
T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9
我们的消费者线程排序为
C1-0, C1-1, C2-0, C2-1
最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区
C1-1 将消费 T1-3, T1-1, T1-9 分区
C2-0 将消费 T1-0, T1-4 分区
C2-1 将消费 T1-8, T1-7 分区
14.2.35.3.3. StickyAssignor分区分配策略
Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个 分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目的,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。
假设消费组内有3个消费者
C0、C1、C2
它们都订阅了4个主题:
t0、t1、t2、t3
并且每个主题有2个分区,也就是说整个消费组订阅了
t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区
最终的分配结果如下:
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1
这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同
此时假设消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1
如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1
可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。
如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。
到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。
举例,同样消费组内有3个消费者:
C0、C1、C2
集群中有3个主题:
t0、t1、t2
这3个主题分别有
1、2、3个分区
也就是说集群中有
t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区
消费者C0订阅了主题t0
消费者C1订阅了主题t0和t1
消费者C2订阅了主题t0、t1和t2
如果此时采用RoundRobinAssignor策略:
消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2
如果此时采用的是StickyAssignor策略:
消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2
此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为:
消费者C1:t0p0、t1p1
消费者C2:t1p0、t2p0、t2p1、t2p2
StickyAssignor策略,那么分配结果为:
消费者C1:t1p0、t1p1、t0p0
消费者C2:t2p0、t2p1、t2p2
可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:
t1p0、t1p1、t2p0、t2p1、t2p2。
从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。
14.2.36. Rebalance (重平衡)
Rebalance 本质上是一种协议, 规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。
Rebalance 发生时, 所有的 Consumer Group 都停止工作, 直到 Rebalance完成。
14.2.36.1. Coordinator
kafka0.9之后:Group Coordinator 是一个服务, 每个 Broker 在启动的时候都会启动一个该服务, Group Coordinator 的作用是用来存储 Group 的相关 Meta 信息, 并将对应 Partition 的 Offset 信息记录到 Kafka 内置 Topi(__consumer_offsets)中。
Kafka 在0.9之前是基于 Zookeeper 来存储Partition的 offset信息(consumers/{group}/offsets/{topic}/{partition}), 因为 Zookeeper 并不适用于频繁的写操作, 所以在0.9之后通过内置 Topic 的方式来记录对应 Partition 的 offset。
14.2.36.2. 触发条件
- 组成员个数发生变化
- 新的消费者加入到消费组
- 消费者主动退出消费组
- 消费者被动下线. 比如消费者长时间的GC, 网络延迟导致消费者长时间未向Group
Coordinator发送心跳请求, 均会认为该消费者已经下线并踢出
- 订阅的 Topic 的 Consumer Group 个数发生变化
- Topic 的分区数发生变化
14.2.36.3. Rebalace 流程
Rebalance 过程分为两步:Join 和 Sync
- Join: 顾名思义就是加入组. 这一步中, 所有成员都向 Coordinator 发送 JoinGroup 请求, 请求
加入消费组. 一旦所有成员都发送了 JoinGroup 请求, Coordinator 会从中选择一个
Consumer 担任 Leader 的角色, 并把组成员信息以及订阅信息发给 Consumer Leader 注意
Consumer Leader 和 Coordinator不是一个概念. Consumer Leader负责消费分配方案的制
定 - Sync: Consumer Leader 开始分配消费方案, 即哪个 Consumer 负责消费哪些 Topic 的哪些
Partition. 一旦完成分配, Leader 会将这个方案封装进 SyncGroup 请求中发给 Coordinator,
非 Leader 也会发 SyncGroup 请求, 只是内容为空. Coordinator 接收到分配方案之后会把方
案塞进SyncGroup的Response中发给各个Consumer. 这样组内的所有成员就都知道自己应
该消费哪些分区了
14.2.36.4. 如何避免 Rebalance
对于触发条件的 2 和 3, 我们可以人为避免. 1 中的 1 和 3 人为也可以尽量避免, 主要核心为 3
心跳相关
session.timeout.ms = 6s
heartbeat.interval.ms = 2s
消费时间
max.poll.interval.ms
14.2.37. 日志索引
Kafka 能支撑 TB 级别数据, 在日志级别有两个原因:
- 顺序写
- 日志索引.
顺序写后续会讲。
Kafka 在一个日志文件达到一定数据量 (1G) 之后, 会生成新的日志文件, 大数据情况下会有多个日
志文件, 通过偏移量来确定到某行纪录时, 如果遍历所有的日志文件, 那效率自然是很差的. Kafka
在日志级别上抽出来一层日志索引, 来方便根据 offset 快速定位到是某个日志文件
每一个 partition 对应多个个 log 文件(最大 1G), 每一个 log 文件又对应一个 index 文件
通过 offset 查找 Message 流程:
- 先根据 offset (例: 368773), 二分定位到最大 小于等于该 offset 的 index 文件
(368769.index) - . 通过二分(368773 - 368769 = 4)定位到 index 文件 (368769.index) 中最大 小于等于该
offset 的 对于的 log 文件偏移量(3, 497) - 通过定位到该文件的消息行(3, 497), 然后在往后一行一行匹配揭露(368773 830)
14.2.38. 解释如何减少ISR中的扰动?broker什么时候离开ISR?
ISR是一组与leaders完全同步的消息副本,也就是说ISR中包含了所有提交的消息。ISR应该总是包含所有的副本,直到出现真正的故障。
如果一个副本从leader中脱离出来,将会从ISR中删除。那么leader挂掉之后,就会从isr中选取新的leader.
isr就像nameNode和SecondnAMEnODE一样,保存这和Leader完全同步的数据。
扰动,就是说isr中的breaker反复的进入isr列表和退出isr列表,可能是由两个参数控制,第一个是某一个broker多长时间没有和leader同步,或者是相差数据太多导致,可以将这两个参数调节大一点解决。
14.2.39. ISR、OSR、AR 是什么?
ISR:In-Sync Replicas 副本同步队列
OSR:Out-of-Sync Replicas
AR:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(具体可以参见 图文了解 Kafka 的副本复制机制),超过相应的阈值会把 follower 剔除出 ISR, 存入OSR(Out-of-Sync Replicas )列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR
14.2.40. LEO、HW、LSO、LW等分别代表什么?
LEO:是 LogEndOffset 的简称,代表当前日志文件中下一条。
HW:水位或水印(watermark)一词,也可称为高水位(high watermark),通常被用在流式处理领域(比如Apache Flink、Apache Spark等),以表征元素或事件在基于时间层面上的进度。在Kafka中,水位的概念反而与时间无关,而是与位置信息相关。严格来说,它表示的就是位置信息,即位移(offset)。取 partition 对应的 ISR中 最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置上一条信息
LSO:是 LastStableOffset 的简称,对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同。
LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值。
14.2.41. 如何进行 Leader 副本选举?
每个分区的 leader 会维护一个 ISR 集合,ISR 列表里面就是 follower 副本的 Borker 编号,只有“跟得上” Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms
参数配置的。只有 ISR 里的成员才有被选为 leader 的可能。
所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false
的情况下,Kafka 会从 ISR 列表中选择 第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。
14.2.42. 如何进行 broker Leader 选举?
(1) 在 kafka
集群中,会有多个 broker
节点,集群中第一个启动的 broker
会通过在 zookeeper 中创建临时节点 /controller 来让自己成为控制器,其他 broker
启动时也会在zookeeper
中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在 zookeeper
中创建 watch 对象,便于它们收到控制器变更的通知。
(2) 如果集群中有一个 broker
发生异常退出了,那么控制器就会检查这个 broker
是否有分区的副本 leader
,如果有那么这个分区就需要一个新的 leader
,此时控制器就会去遍历其他副本,决定哪一个成为新的 leader
,同时更新分区的 ISR
集合。
(3) 如果有一个 broker
加入集群中,那么控制器就会通过 Broker ID
去判断新加入的 broker
中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。
(4) 集群中每选举一次控制器,就会通过 zookeeper 创建一个 controller epoch,每一个选举都会创建一个更大,包含最新信息的 epoch,如果有 broker 收到比这个 epoch 旧的数据,就会忽略它们,kafka 也通过这个 epoch 来防止集群产生“脑裂”。
14.2.43. Kafka为什么需要复制?
Kafka的信息复制确保了任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。
请说明Kafka 的消息投递保证(delivery guarantee)机制以及如何实现?
Kafka支持三种消息投递语义:
At most once:消息可能会丢,但绝不会重复传递
At least one:消息绝不会丢,但可能会重复传递
Exactly once:每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的
consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset,该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同,可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。
如果只讨论这个读取消息的过程,那Kafka是确保了Exactly once。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once。
读完消息先处理再commit消费状态(保存offset)。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于At least once
如果一定要做到Exactly once,就需要协调offset和实际操作的输出。经典的做法是引入两阶段提交,
但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once,而Exactly once要求与目标存储系统协作,Kafka提供的offset可以较为容易地实现这种方式。
14.2.44. Kafka 的高可靠性是怎么实现的?
注意:也可回答“Kafka在什么情况下会出现消息丢失?”数据可靠性(可回答“怎么尽可能保证Kafka的可靠性?”)
Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知。本文从Producter向Broker发送消息、Topic 分区副本以及 Leader选举几个角度介绍数据的可靠性。
14.2.44.1. Topic分区副本
在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,
因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证
数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本(详情请参见 KAFKA-50)。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为3
Kafka 可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上的复制数据。当 Leader 挂了的时候,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。
Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。
14.2.44.2. Producer往Broker 发送消息
如果我们要往 Kafka 对应的主题发送消息,我们需要通过 Producer 完成。前面我们讲过 Kafka 主题对应了多个分区,每个分区下面又对应了多个副本;为了让用户设置数据可靠性, Kafka 在 Producer 里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过request.required.acks 参数设置的)。
这个参数支持以下三种值:
acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka。在这种情况下还是有可能发生错误,比如发送的对象无能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,一定会丢失一些消息。
acks = 1:意味若 Leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 Leader 选举,生产者会在选举时收到一个
LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 Leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入Leader,但在消息被复制到 follower 副本之前 Leader发生崩溃
acks = all(这个和 request.required.acks = -1 含义一样):意味着 Leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息
根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性
14.2.44.3. Leader 选举
在介绍 Leader 选举之前,让我们先来了解一下 ISR(in-sync replicas)列表。每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms 参数配置的。只有 ISR 里的成员才有被选为 leader 的可能。
14.2.44.4. 数据一致性(可回答“Kafka数据一致性原理?”)
这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?
假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于
ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题
当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数
replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。
14.2.45. Kafka 分区数可以增加或减少吗?为什么?
我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。 Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂.
14.2.46. Kafka消息可靠性的保证
Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种。
14.2.46.1. Broker
Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。
那么kafka首先会将数据写入内存页中,系统通过刷盘的方式将数据持久化到内存当中,但是在存储在内存那一会,如果发生断电行为,内存中的数据是有可能发生丢失的,也就是说kafka中的数据可能丢失。
Broker配置刷盘机制,是通过调用fsync函数接管了刷盘动作。从单个Broker来看,pageCache的数据会丢失。
也就是说,理论上,要完全让kafka保证单个broker不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况。
比如,减少刷盘间隔,减少刷盘数据量大小,也就是频繁的刷盘操作。时间越短,性能越差,可靠性越好(尽可能可靠)。这是一个选择题。
而减少刷盘频率,可靠性不高,但是性能好。
为了解决该问题,kafka通过producer和broker协同处理单个broker丢失参数的情况。一旦producer发现broker消息丢失,即可自动进行retry。除非retry次数超过阀值(可配置),消息才会丢失。此时需要生产者客户端手动处理该情况。那么producer是如何检测到数据丢失的呢?是通过ack机制,类似于http的三次握手的方式。
acks=0,producer不等待broker的响应,效率最高,但是消息很可能会丢。这种情况也就是说还没有等待leader同步完数据,所以肯定发生数据的丢失。
acks=1,leader broker收到消息后,不等待其他follower的响应,即返回ack。也可以理解为ack数为1。此时,如果follower还没有收到leader同步的消息leader就挂了,那么消息会丢失,也就是如果leader收到消息,成功写入PageCache后,会返回ack,此时producer认为消息发送成功。但此时,数据还没有被同步到follower。如果此时leader断电,数据会丢失。
acks=-1,leader broker收到消息后,挂起,等待所有ISR列表中的follower返回结果后,再返回ack。-1等效与all。这种配置下,只有leader写入数据到pagecache是不会返回ack的,还需要所有的ISR返回“成功”才会触发ack。如果此时断电,producer可以知道消息没有被发送成功,将会重新发送。如果在follower收到数据以后,成功返回ack,leader断电,数据将存在于原来的follower中。在重新选举以后,新的leader会持有该部分数据。数据从leader同步到follower,需要2步:
数据从pageCache被刷盘到disk。因为只有disk中的数据才能被同步到replica。
数据同步到replica,并且replica成功将数据写入PageCache。在producer得到ack后,哪怕是所有机器都停电,数据也至少会存在于leader的磁盘内。
那么在这上面提到一个isr的概念,可以想象以下,如果leader接收到消息之后,一直等待所有的followe返回ack确认,但是有一个发生网络问题,始终无法返回ack怎么版?
显然这种情况不是我们希望的,所以就产生了isr,这个isr就是和leader同步数据的最小子集和,只要在isr中的follower,那么leader必须等待同步完消息并且返回ack才可以,否则就不反悔ack。isr的个数通常通过``min.insync.replicas`参数配置。
借用网上的一张图,感觉说的很明白:
0,1,-1性能一次递减,但是可靠性一直在提高。
14.2.46.2. Producer
Producer丢失消息,发生在生产者客户端。
为了提升效率,减少IO,producer在发送数据时可以将多个请求进行合并后发送。被合并的请求先缓存在本地buffer中。缓存的方式和前文提到的刷盘类似,producer可以将请求打包成“块”或者按照时间间隔,将buffer中的数据发出。通过buffer我们可以将生产者改造为异步的方式,而这可以提升我们的发送效率。
但是,buffer中的数据就是危险的。在正常情况下,客户端的异步调用可以通过callback来处理消息发送失败或者超时的情况,但是,一旦producer被非法的停止了,那么buffer中的数据将丢失,broker将无法收到该部分数据。又或者,当Producer客户端内存不够时,如果采取的策略是丢弃消息(另一种策略是block阻塞),消息也会被丢失。抑或,消息产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。
根据上图,可以想到几个解决的思路:
异步发送消息改为同步发送。或者service产生消息时,使用阻塞的线程池,并且线程数有一定上限。整体思路是控制消息产生速度。
扩大Buffer的容量配置。这种方式可以缓解该情况的出现,但不能杜绝。
service不直接将消息发送到buffer(内存),而是将消息写到本地的磁盘中(数据库或者文件),由另一个(或少量)生产线程进行消息发送。相当于是在buffer和service之间又加了一层空间更加富裕的缓冲层
14.2.46.3. Consumer消费消息有下面几个步骤:
接收消息
处理消息
反馈“处理完毕”(commited)
Consumer的消费方式主要分为两种:
自动提交offset,Automatic Offset Committing
手动提交offset,Manual Offset Control
Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了。此时消息就丢失了。
Properties props = new **Properties**();
props.**put**("bootstrap.servers", "localhost:9092");
props.**put**("group.id", "test");*// 自动提交开关props.put("enable.auto.commit", "true");// 自动提交的时间间隔,此处是1sprops.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {// 调用poll后,1000ms后,消息状态会被改为 committed ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) insertIntoDB(record); // 将消息入库,时间可能会超过1000ms}*
上面的示例是自动提交的例子。如果此时,insertIntoDB(record)
发生异常,消息将会出现丢失。接下来是手动提交的例子:
Properties props = new **Properties**();
props.**put**("bootstrap.servers", "localhost:9092");
props.**put**("group.id", "test");*// 关闭自动提交,改为手动提交props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));final int minBatchSize = 200;List> buffer = new ArrayList<>();while (true) {// 调用poll后,不会进行auto commit ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) { buffer.add(record); }if (buffer.size() >= minBatchSize) { insertIntoDb(buffer);// 所有消息消费完毕以后,才进行commit操作 consumer.commitSync(); buffer.clear(); }}*
将提交类型改为手动以后,可以保证消息“至少被消费一次”(at least once)。但此时可能出现重复消费的情况。也就是数据处理完成之后,手动进行提交的方式。
另外,Producer 发送消息还可以选择同步或异步模式,如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。
14.2.47. 为什么kafka中1个partition只能被同组的一个consumer消费?
Kafka通过消费者组机制同时实现了发布/订阅模型和点对点模型。多个组的消费者消费同一个分区属于多订阅者的模式,自然没有什么问题;
而在单个组内某分区只交由一个消费者处理的做法则属于点对点模式。其实这就是设计上的一种取舍,如果Kafka真的允许组内多个消费者消费同一个分区,也不是什么灾难性的事情,只是没什么意义,而且还会重复消费消息。
通常情况下,我们还是希望一个组内所有消费者能够分担负载,让彼此做的事情没有交集,做一些重复性的劳动纯属浪费资源。就如同电话客服系统,每个客户来电只由一位客服人员响应。那么请问我就是想让多个人同时接可不可以?当然也可以了,我不觉得技术上有什么困难,只是这么做没有任何意义罢了,既拉低了整体的处理能力,也造成了人力成本的浪费。
还由另外一点,如果让一个消费者组中的多个消费者消费同一个分区数据,那么我们保证多个消费者之间顺序的去消费数据的话,这里就产生了线程安全的问题,导致系统的设计更加的复杂。
总之,我的看法是这种设计不是出于技术上的考量而更多还是看效率等非技术方面。
14.2.48. kafka和zookeeper的关系
kafka 使用 zookeeper 来保存集群的元数据信息和消费者信息(偏移量),没有 zookeeper,kafka 是工作不起来。在 zookeeper
上会有一个专门用来进行 Broker
服务器列表记录的点,节点路径为/brokers/ids
。
每个 Broker 服务器在启动时,都会到 Zookeeper 上进行注册,即创建 /brokers/ids/[0-N]
的节点,然后写入 IP,端口等信息,Broker 创建的是临时节点,所以一旦 Broker 上线或者下线,对应 Broker 节点也就被删除了,因此可以通过 zookeeper 上 Broker 节点的变化来动态表征 Broker 服务器的可用性。
14.2.49. zookeeper在kafka中的作用
Kafka
集群中有一个 broker
会被选举为 Controller
,负责管理集群 broker
的上下线,所有 topic
的分区副本分配和 leader
选举等工作。
Controller
的管理工作都是依赖于 Zookeeper
的。
Apache Kafka是一个使用Zookeeper构建的分布式系统。Zookeeper的主要作用是在集群中的不同节点之间建立协调;如果任何节点失败,我们还使用Zookeeper从先前提交的偏移量中恢复,因为它做周期性提交偏移量工作。
说明
- 从Zookeeper中读取当前分区的所有ISR(in-sync replicas)集合
- 调用配置的分区选择算法选择分区的leader
14.2.49.1. 作用
14.2.49.1.1. Broker注册
Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点。
14.2.49.1.2. Topic注册
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录。
14.2.49.1.3. 生产者负载均衡
由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
14.2.49.1.4. 消费者负载均衡
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
14.2.49.1.5. 分区与消费者的关系
消费组(Consumer Group):consumer group下有多个Consumer(消费者),对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。 订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。 同时,Kafka为每个消费者分配一个Consumer ID,通常采用”Hostname:UUID”形式表示。
在Kafka中,规定了每个消息分区只能被同组的一个消费者进行消费,因此,需要在Zookeeper上记录 消息分区 与 Consumer之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上。
14.2.49.1.6. 消费进度Offset记录
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录。节点内容是Offset的值。
14.2.49.1.7. 消费者注册
每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点。
早期版本的Kafka用zk做meta信息存储,consumer的消费状态,group的管理以及offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖。
Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用,kafka不可能越过Zookeeper直接联系Kafka broker,一旦Zookeeper停止工作,它就不能服务客户
端请求。
Zookeeper主要用于在集群中不同节点之间进行通信,在Kafka中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,除此之外,它还执行其他活动,如:
leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等
14.2.50. Kafka服务器能接收到的最大信息是多少?
Kafka服务器可以接收到的消息的最大大小是1000000字节
14.2.51. Kafka中的ZooKeeper是什么?Kafka是否可以脱离ZooKeeper独立运行?
本篇针对的是2.8版本之前的Kafka,2.8版本及之后Kafka已经移除了对Zookeeper的依赖,通过KRaft进行自己的集群管理,不过目前只是测试阶段。
Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。
不可能越过Zookeeper直接联系Kafka broker,一旦Zookeeper停止工作,它就不能服务客户端请求。
Zookeeper主要用于在集群中不同节点之间进行通信,在Kafka中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,除此之外,它还执行其他活动,如: leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。
一个消费者组中只有一个消费者可以消费分区数据,这样所还可以保证线程的安全性,如果由多个消费者可以消费一个分区中的数据,那么如和保证多个线程之间顺序的消费这一个分区中的数据,可能还需要添加锁机制,所以提高了系统的复杂度。
14.2.52. Kafka的高性能的原因
高吞吐
- 顺序读写
- 零拷贝
- 分区+分段(建立索引):并行度高,每一个分区分为多个segment,每一次操作都是针对一小部分数据,并且增加了并行操作的能力。
- 批量发送:kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka
- 等到多少条消息后发送。
- 等待多长时间后发送。
- 数据压缩:Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩压缩的好处就是减少传输的数据量,减轻对网络传输的压力
- producer在发送的压缩数据,kafk不会解压缩,而是直接存储压缩的文件。
容错性:
- 集群的容错性
- partition的leader的容错性
- 数据有副本,保证数据的容错性
高性能
- 顺序读写(比随机读写号很多)
- Kafka是通过文件追加的方式来写入消息的,只能在日志文件的最后追加新的消息,并且不允许修改已经写入的消息,这种方式就是顺序写磁盘,而顺序写磁盘的速度是非常快的。
- 零拷贝技术
- 日志分段存储
- 为了防止日志(Log)过大,Kafka引入了日志分段(LogSegment)的概念,将日志切分成多个日志分段。在磁盘上,日志是一个目录,每个日志分段对应于日志目录下的日志文件、偏移量索引文件、时间戳索引文件(可能还有其他文件)
- 向日志中追加消息是顺序写入的,只有最后一个日志分段才能执行写入操作,之前所有的日志分段都不能写入数据。
- 为了便于检索,每个日志分段都有两个索引文件:偏移量索引文件和时间戳索引文件。每个日志分段都有一个基准偏移量baseOffset,用来表示当前日志分段中第一条消息的offset。偏移量索引文件和时间戳索引文件是以稀疏索引的方式构造的,偏移量索引文件中的偏移量和时间戳索引文件中的时间戳都是严格单调递增的。
- 查询指定偏移量(或时间戳)时,使用二分查找快速定位到偏移量(或时间戳)的位置。可见Kāfk中对消息的查找速度还是非常快的。
- 分区存储:producer可以将数据发送到一个topic下面的多个分区,而这些分区的leadder是部署在不同的节点机器上的,这样的话,肯定比将数据发送到一台机器上性能好,对于消费者,一个分区只能由一个消费者组中的一个消费者消费,这样保证不会重复的消费消息,而多个消费者可以在不同的分区中消费消息,相当于并行读写,所以性能高。
- 分区的设计使得Kafka消息的读写性能可以突破单台broker的/O性能瓶颈,可以在创建主题的时候指定分区数,也可以在主题创建完成之后去修改分区数,通过增加分区数可以实现水平扩展,但是要注意,分区数也不是越多越好,一般达到煤一个阈值之后,再增加分区数性能反而会下降,具体阈值需要对Kak集群进行压测才能确定。
- 页缓存技术:kafka中使用页缓存技术,把对磁盘的io操作,转换为对内存的操作,速度非常快,极大的提高磁盘Io的性能。
补充什么是页缓存:
页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘I/O的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。为了弥补性能上的差异 ,现代操作系统越来越多地将内存作为磁盘缓存,甚至会将所有可用的内存用途磁盘缓存,这样当内存回收时也几乎没有性能损失,所有对于磁盘的读写也将经由统一的缓存。
当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(page cache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘I/O操作;如果没有命中,则操作系统会向磁盘发起读取请示并将读取的数据页写入页缓存,之后再将数据返回进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以操作数据的一致性。
Kafka中大量使用了页缓存,这是Kafka实现高吞吐的重要因此之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务,但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过log.flush.interval.message、log.flush.interval.ms等参数来控制。同步刷盘可以提高消息的可行性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过一般不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。
- 顺序读写(比随机读写号很多)
14.2.53. kafka broker 挂了怎么办
controller在启动时会注册zk监听器来监听zookeeper中的/brokers/ids节点下的子节点变化,即集群中所有的broker列表,而每台broker在启动时会向zk的/brokers/ids下写入一个名字为broker.id的临时节点,当该broker挂掉或与zk断开连接时,此临时节点会被移除,之后controller端的监听器就会自动感知这个变化并将BrokerChange时间写入到controller上的请求阻塞队列里。
一旦controller端从阻塞队列中获取到该事件,她会开启BrokerChange事件的处理逻辑,具体包括
1 获取当前存活的broker列表
2 根据之前缓存的broker列表计算出当前已经挂掉的broker列表
3 更新controller端缓存
4 对于当前所有存活的broker,更新元数据信息并且启动新broker上的分区和副本
5 对于挂掉的那些broker,处理这些broker上的分区副本(标记为offline已经执行offline逻辑并更新元数据)
14.2.54. 关于kafka的isr机制
14.2.54.1. kafka replica
- 当某个topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replica)。kafka的replica包含leader与follower。
- Replica的个数小于等于Broker的个数,也就是说,对于每个Partition而言,每个Broker上最多只会有一个Replica,因此可以使用Broker id 指定Partition的Replica。
- 所有Partition的Replica默认情况会均匀分布到所有Broker上。
14.2.54.2. Data Replication如何Propagate(扩散出去)消息?
每个Partition有一个leader与多个follower,producer往某个Partition中写入数据是,只会往leader中写入数据,然后数据才会被复制进其他的Replica中。
数据是由leader push过去还是有flower pull过来?
kafka是由follower周期性或者尝试去pull(拉)过来(其实这个过程与consumer消费过程非常相似),写是都往leader上写,但是读并不是任意flower上读都行,读也只在leader上读,flower只是数据的一个备份,保证leader被挂掉后顶上来,并不往外提供服务。
14.2.54.3. Data Replication何时Commit?
同步复制: 只有所有的follower把数据拿过去后才commit,一致性好,可用性不高。
异步复制: 只要leader拿到数据立即commit,等follower慢慢去复制,可用性高,立即返回,一致性差一些。
**Commit:**是指leader告诉客户端,这条数据写成功了。kafka尽量保证commit后立即leader挂掉,其他flower都有该条数据。
kafka不是完全同步,也不是完全异步,是一种ISR机制:
- leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
- 如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
- 当ISR中所有Replica都向Leader发送ACK时,leader才commit
既然所有Replica都向Leader发送ACK时,leader才commit,那么flower怎么会leader落后太多?
producer往kafka中发送数据,不仅可以一次发送一条数据,还可以发送message的数组;批量发送,同步的时候批量发送,异步的时候本身就是就是批量;底层会有队列缓存起来,批量发送,对应broker而言,就会收到很多数据(假设1000),这时候leader发现自己有1000条数据,flower只有500条数据,落后了500条数据,就把它从ISR中移除出去,这时候发现其他的flower与他的差距都很小,就等待;如果因为内存等原因,差距很大,就把它从ISR中移除出去。
commit策略:
server配置
rerplica.lag.time.max.ms=10000
# 如果leader发现flower超过10秒没有向它发起fech请求,那么leader考虑这个flower是不是程序出了点问题
# 或者资源紧张调度不过来,它太慢了,不希望它拖慢后面的进度,就把它从ISR中移除。
rerplica.lag.max.messages=4000 # 相差4000条就移除
# flower慢的时候,保证高可用性,同时满足这两个条件后又加入ISR中,
# 在可用性与一致性做了动态平衡 亮点1234567
topic配置
min.insync.replicas=1 # 需要保证ISR中至少有多少个replica1
producer配置
request.required.asks=0
# 0:相当于异步的,不需要leader给予回复,producer立即返回,发送就是成功,
那么发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息 2.Leader与Follower数据不同步),
既有可能丢失也可能会重发 # 1:当leader接收到消息之后发送ack,丢会重发,丢的概率很小
# -1:当所有的follower都同步消息成功后发送ack. 丢失消息可能性比较低123456
14.2.54.4. Data Replication如何处理Replica恢复
leader挂掉了,从它的follower中选举一个作为leader,并把挂掉的leader从ISR中移除,继续处理数据。一段时间后该leader重新启动了,它知道它之前的数据到哪里了,尝试获取它挂掉后leader处理的数据,获取完成后它就加入了ISR。
14.2.54.5. Data Replication如何处理Replica全部宕机
1、等待ISR中任一Replica恢复,并选它为Leader
- 等待时间较长,降低可用性
- 或ISR中的所有Replica都无法恢复或者数据丢失,则该Partition将永不可用
2、选择第一个恢复的Replica为新的Leader,无论它是否在ISR中
- 并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失
- 可用性较高
14.2.55. Exactly Once语义
将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
相对的,At Least Once可以保证数据不重复,但是不能保证数据不丢失。
但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义。在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11版本的Kafka,引入了一项重大特性:幂等性。
开启幂等性enable.idempotence=true。
所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:
At Least Once + 幂等性 = Exactly Once
Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
补充,在流式计算中怎么Exactly Once语义?以flink为例
- souce:使用执行ExactlyOnce的数据源,比如kafka等
内部使用FlinkKafakConsumer,并开启CheckPoint,偏移量会保存到StateBackend中,并且默认会将偏移量写入到topic中去,即_consumer_offsets Flink设置CheckepointingModel.EXACTLY_ONCE
- sink
存储系统支持覆盖也即幂等性:如Redis,Hbase,ES等 存储系统不支持覆:需要支持事务(预写式日志或者两阶段提交),两阶段提交可参考Flink集成的kafka sink的实现。