Kafka技术手册(基础)
Kafk手册
初级
请说明什么是Apache Kafka?
Kafka 是由 Linkedin
公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
Kafka的基本术语
消息:Kafka 中的数据单元被称为消息
,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:为了提高效率, 消息会分批次
写入 Kafka,批次就代指的是一组消息。
主题:消息的种类称为 主题
(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
分区:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序
offset:偏移量,kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafkaOffset
Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
Producer:消息生产者,向kafka broker发消息的客户端
Consumer:消息消费者,向kafka broker取消息的客户端
Group消费者组:这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic
生产者: 向主题发布消息的客户端应用程序称为生产者
(Producer),生产者用于持续不断的向某个主题发送消息。
消费者:订阅主题消息的客户端程序称为消费者
(Consumer),消费者用于处理生产者产生的消息。
消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组
(Consumer Group)指的就是由一个或多个消费者组成的群体。
偏移量:偏移量
(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
broker: 一个独立的 Kafka 服务器就被称为 broker
,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker 集群:broker 是集群
的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器
的角色(自动从集群的活跃成员中选举出来)。
副本:Kafka 中消息的备份又叫做 副本
(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。或者说某一个消费者加入一个消费者组,那么就需要重新平衡消息的发送,Rebalance 是 Kafka 消费者端实现高可用的重要手段。
请说明什么是传统的消息传递方法?
传统的消息传递方法包括两种:
- 队列:在队列中,一组用户可以从服务器中读取消息,每条消息都发送给其中一个人,某个消费者消费数据之后,消息会随机删除。
- 发布-订阅:在这个模型中,消息被广播给所有的用户,可以有多个生产者和多个消费者。
数据传输的事务有几种?
数据传输的事务定义通常有以下三种级别:
- 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输,这种情况会发生数据的丢失。
- 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
- 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的,端到端的精确一次性最难保证。
使用消息队列的好处
- 解耦
- 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 可恢复性
- 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 缓冲
- 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 灵活性 &峰值处理能力
- 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
- 异步通信
- 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
为什么选择kafka
kafka由多个生产者
KafKa 可以无缝地支持多个生产者,不管客户端使用一个主题,还是多个主题。Kafka 适合从多个前端系统收集数据,并以统一的格式对外提供数据。
多个消费者
Kafka 支持多个消费者从一个单独的消息流中读取数据,并且消费者之间互不影响。这与其他队列系统不同,其他队列系统一旦被客户端读取,其他客户端就不能 再读取它。并且多个消费者可以组成一个消费者组,他们共享一个消息流,并保证消费者组对每个给定的消息只消费一次。
多个生产者多个消费者,天然支持的并发量大,消息的吞吐量自然也就大;
基于磁盘的消息存储
Kafka 允许消费者非实时地读取消息,原因在于 Kafka 将消息提交到磁盘上,设置了保留规则进行保存,无需担心消息丢失等问题。
伸缩性
可扩展多台 broker。用户可以先使用单个 broker,到后面可以扩展到多个 broker。
高性能
Kafka 可以轻松处理百万千万级消息流,同时还能保证 亚秒级 的消息延迟。
Kafka生产者分区策略
- 指明 partition 的情况下,直接将指明的值直接作为partiton值。
- 没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值。
- 既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
Kafka 缺点?
- 由于是批量发送,数据并非真正的实时
- 对于mqtt协议不支持
- 不支持物联网传感数据直接接入
- 仅支持统一分区内消息有序,无法实现全局消息有序
- 监控不完善,需要安装插件
- 依赖zookeeper进行元数据管理。
Kafka消息队列
Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式
点对点模式
Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式.
点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。
- 消费者去队列中拉去消息,即使有多个消费者,那生产者也会把消息发送到队列中,多个消费者轮询到队列中拉去消息。
发布订阅模式
发布订阅模式有两种形式,一种是消费者主动拉取数据的形式,另一种是生产者推送消息的形式,kafka是基于发布订阅模式中消费者拉取的方式。消费者的消费速度可以由消费者自己决定。但是这种方式也有缺点,当没有消息的时候,kafka的消费者还需要不停的访问kafka生产者拉取消息,浪费资源。
- 这种方式中,消费者和生产者都可以有多个。
请简述下你在哪些场景下会选择 Kafka?
日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
流式处理:比如spark streaming和 Flink
如何保证每个应用程序都可以获取到 Kafka 主题中的所有消息,而不是部分消息?
为每个应用程序创建一个消费者组,然后往组中添加消费者来伸缩读取能力和处理能力,每个群组消费主题中的消息时,互不干扰。
本质上是实现广播的功能。
Kafka的设计是什么样的呢?
Kafka将消息以topic为单位进行归纳
将向Kafka topic发布消息的程序成为producers.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息
将预订topics并消费消息的程序成为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
partition的数据如何保存到硬盘
topic中的多个partition以文件夹的形式保存到broker(也就是每一个kafka的服务器上),每个分区序号从0递增,且消息有序,Partition文件下有多个segment(xxx.index,xxx.log),segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为1g,如果大小大于1g时,会滚动一个新的segment并且以上一个segment最后一条消息的偏移量命名。
Kafka的消费者如何消费数据
消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置,等到下次消费时,他会接着上次位置继续消费。
这个offset的位置是由zookeeper来维护的。
kafka应用场景
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势,
kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(⽐如,消息重发,消息发送丢失等)
kafka的特性决定它非常适合作为"日志收集中心";
application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;
kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.
中级
Kafka 的设计架构?
简单的设计架构
Kafak 总体架构图中包含多个概念:
(1)ZooKeeper:Zookeeper
负责保存 broker
集群元数据,并对控制器进行选举等操作。
(2)Producer:生产者负责创建消息,将消息发送到 Broker。
(3)Broker: 一个独立的 Kafka
服务器被称作 broker
,broker 负责接收来自生产者的消息,为消息设置偏移量,并将消息存储在磁盘。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
(4)Consumer:消费者负责从 Broker
订阅并消费消息。
(5)Consumer Group:Consumer Group
为消费者组,一个消费者组可以包含一个或多个 Consumer
。
使用 多分区 + 多消费者 方式可以极大 提高数据下游的处理速度,
同一消费者组中的消费者不会重复消费消息
,同样的,不同消费组中的消费者消费消息时互不影响。Kafka 就是通过消费者组的方式来实现消息 P2P 模式和广播模式。
(6)Topic:Kafka 中的消息 以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
(7)Partition:一个 Topic 可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的 日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的 偏移量(offset)。
(8)Offset:offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka保证的是分区有序性而不是主题有序性。
(9)Replication:副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络异常,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。
(10)Record:实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了key
、value
和 timestamp
。
(11)Leader: 每个分区多个副本的 "主" leader,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
(12)follower: 每个分区多个副本中的"从" follower,实时从 Leader 中同步数据,保持和 leader 数据的同步。Leader 发生故障时,某个 follow 会成为新的 leader。
介绍一下零拷贝技术
从 Kafka 里经常要消费数据,那么消费的时候实际上就是要从 kafka 的磁盘文件里读取某条数据然后发送给下游的消费者,如下图所示。
那么这里如果频繁的从磁盘读数据然后发给消费者,会增加两次没必要的拷贝,如下图:
一次是从操作系统的 cache 里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝回操作系统的 Socket 缓存里。
而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。所以这种方式来读取数据是比较消耗性能的。
Kafka 为了解决这个问题,在读数据的时候是引入零拷贝技术。
也就是说,直接让操作系统的 cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存,如下图所示:
通过 零拷贝技术,就不需要把 os cache 里的数据拷贝到应用缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝。
对 Socket 缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从 os cache 中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。
Kafka 从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是直接读内存的。
kafka 集群经过良好的调优,数据直接写入 os cache 中,然后读数据的时候也是从 os cache 中读。相当于 Kafka 完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。
传统IO流程:
第一次:将磁盘文件,读取到操作系统内核缓冲区。
第二次:将内核缓冲区的数据,copy到application应用程序的buffer。
第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区)
第四次:将socket buffer的数据,copy到网卡,由网卡进行网络传输。
传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的。实际IO读写,需要进行IO中断,需要CPU响应中断(带来上下文切换),尽管后来引入DMA来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。
重新思考传统IO方式,会注意到实际上并不需要第二个和第三个数据副本。应用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。
显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销,这也正是零拷贝出现的意义。
所以零拷贝是指读取磁盘文件后,不需要做其他处理,直接用网络发送出去。
实现过程:
简单的来说,比如用户读取一个文件,那么首先文件会读取到内存的缓冲区,然后在从缓冲区通过网络将数据发送给用户。可以抽象为下面两部:
read(file, tmp_buf, len);
write(socket, tmp_buf, len);
但是实际上,中间经历了四个过程,因为读取文件需要在用户态和和心态之间相互转换:
- 程序调用 read 产生一次用户态到内核态的上下文切换。DMA 模块从磁盘读取文件内容,将其拷贝到内核空间的缓冲区,完成第 1 次拷贝。
- 数据从内核缓冲区拷贝到用户空间缓冲区,之后系统调用 read 返回,这回导致从内核空间到用户空间的上下文切换。这个时候数据存储在用户空间的 tmp_buf 缓冲区内,可以后续的操作了。
- 程序调用 write 产生一次用户态到内核态的上下文切换。数据从用户空间缓冲区被拷贝到内核空间缓冲区,完成第 3 次拷贝。但是这次数据存储在一个和 socket 相关的缓冲区中,而不是第一步的缓冲区。
- write 调用返回,产生第 4 个上下文切换。第 4 次拷贝在 DMA 模块将数据从内核空间缓冲区传递至协议引擎的时候发生,这与我们的代码的执行是独立且异步发生的。你可能会疑惑:“为何要说是独立、异步?难道不是在 write 系统调用返回前数据已经被传送了?write 系统调用的返回,并不意味着传输成功——它甚至无法保证传输的开始。调用的返回,只是表明以太网驱动程序在其传输队列中有空位,并已经接受我们的数据用于传输。可能有众多的数据排在我们的数据之前。除非驱动程序或硬件采用优先级队列的方法,各组数据是依照FIFO的次序被传输的(上图中叉状的 DMA copy 表明这最后一次拷贝可以被延后)。
Mmap
上面的数据拷贝非常多,我们可以减少一些重复拷贝来减少开销,提升性能。某些硬件支持完全绕开内存,将数据直接传送给其他设备。这个特性消除了系统内存中的数据副本,因此是一种很好的选择,但并不是所有的硬件都支持。此外,来自于硬盘的数据必须重新打包(地址连续)才能用于网络传输,这也引入了某些复杂性。为了减少开销,我们可以从消除内核缓冲区与用户缓冲区之间的拷贝开始。
减少数据拷贝的一种方法是将 read 调用改为 mmap。例如:
tmp_buf = mmap(file, len);
write(socket, tmp_buf, len);
mmap 调用导致文件内容通过 DMA 模块拷贝到内核缓冲区。然后与用户进程共享缓冲区,这样不会在内核缓冲区和用户空间之间产生任何拷贝。
write 调用导致内核将数据从原始内核缓冲区拷贝到与 socket 关联的内核缓冲区中。
第 3 次数据拷贝发生在 DMA 模块将数据从 socket 缓冲区传递给协议引擎时。
高性能高吞吐的技术实现
页缓存技术
Kafka
是基于 操作系统 的页缓存
来实现文件写入的。
操作系统本身有一层缓存,叫做 page cache,是在 内存里的缓存,我们也可以称之为 os cache,意思就是操作系统自己管理的缓存。
Kafka 在写入磁盘文件的时候,可以直接写入这个 os cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入磁盘文件中。通过这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘,原理图如下:
顺序写
另一个主要功能是 kafka 写数据的时候,是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据
。
普通的机械磁盘如果你要是随机写的话,确实性能极差,也就是随便找到文件的某个位置来写数据。
但是如果你是 追加文件末尾 按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能相差无几。
基于上面两点,kafka 就实现了写入数据的超高性能。
为什么可以这样做,因为kafka每一个分区内部的数据是有序的,落盘后也是有序的,因此可以顺序写从盘;
批发送
批处理是一种常用的用于提高I/O性能的方式. 对Kafka而言, 批处理既减少了网络传输的Overhead, 又提高了写磁盘的效率. Kafka 0.82 之后是将多个消息合并之后再发送, 而并不是send一条就立马发送(之前支持)
# 批量发送的基本单位, 默认是16384Bytes, 即16kB
batch.size
# 延迟时间
linger.ms
# 两者满足其一便发送
数据压缩
数据压缩的一个基本原理是, 重复数据越多压缩效果越好. 因此将整个Batch的数据一起压缩能更大幅度减小数据量, 从而更大程度提高网络传输效率。
Broker接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘, Consumer接受到压缩后的数据再解压缩,
整体来讲: Producer 到 Broker, 副本复制, Broker 到 Consumer 的数据都是压缩后的数据, 保证高效率的传输
零拷贝技术
Kafka 分区的目的?
分区对于Kafka 集群的好处是:实现负载均衡。
分区对于消费者来说,可以提高并发度,提高效率,当然,对于是生产者也可以提高并行度的目的,将数据写入多个分区,提高写入速度,各个分区都是独立的,不会产生并发安全问题。
如果我们假设像标准 MQ 的 Queue, 为了保证一个消息只会被一个消费者消费, 那么我们第一想到的就是加锁. 对于发送者, 在多线程并且非顺序写环境下, 保证数据一致性, 我们同样也要加锁. 一旦考虑到加锁, 就会极大的影响性能.
我们再来看Kafka 的 Partition, Kafka 的消费模式和发送模式都是以 Partition 为分界. 也就是说对于一个 Topic 的并发量限制在于有多少个 Partition, 就能支撑多少的并发. 可以参考 Java 1.7 的 ConcurrentHashMap 的桶设计, 原理一样, 有多少桶, 支持多少的并发
说一下什么是副本?
kafka 为了保证数据不丢失,从 0.8.0
版本开始引入了分区副本机制。在创建 topic 的时候指定 replication-factor
,默认副本为 3 。
副本是相对 partition 而言的,一个分区中包含一个或多个副本,其中一个为leader
副本,其余为follower
副本,各个副本位于不同的 broker
节点中。
所有的读写操作
都是经过 Leader 进行的,同时 follower 会定期地去 leader 上复制数据。当 Leader 挂掉之后,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。
Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。
Kafka消息是采用Pull模式,还是Push模式?
Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息。
一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。最终Kafka还是选取了传统的pull模式
Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式下,consumer就可以根据自己的消费能力去决定这些策略
Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到t达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发。
kafka消息队列优点或者对比
请说明Kafka相对于传统的消息传递方法有什么优势?
- 高性能:单一的Kafka代理可以处理成千上万的客户端,每秒处理数兆字节的读写操作,Kafka性能远超过传统的ActiveMQ、RabbitMQ等,而且Kafka支持Batch操作
- 可扩展:Kafka集群可以透明的扩展,增加新的服务器进集群
- 容错性: Kafka每个Partition数据会复制到几台服务器,当某个Broker失效时,Zookeeper将通知生产
者和消费者从而使用其他的Broker。 - 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
- 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
- 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
Kafka与传统消息队列的区别?
在说区别的时候,我们先来看看kafka的应用场景:
kafka是个日志处理缓冲组件,在大数据信息处理中使用。和传统的消息队列相比较简化了队列结构和功能,以流形式处理存储(持久化)消息(主要是日志)。日志数据量巨大,处理组件一般会处理不过来,所以作为缓冲曾的kafka,支持巨大吞吐量。为了防止信息丢失,其消息被消费后不直接丢弃,要多存储一段时间,等过期时间过了才丢弃。这是mq和redis不能具备的。
主要特点入下:
巨型存储量:
- 支持TB甚至PB级别数据,因为数据是持久化在磁盘,因此可以做到大数据量的存储;
高吞吐,高IO:
- 一般配置的服务器能实现单机每秒100K条以上消息的传输。
消息分区,分布式消费:
- 首先kafka会将接收到的消息分区(partition),每个主题(topic)的消息有不同的分区,这样一方面消息的存储就不会受到单一服务器存储空间大小的限制,另一方面消息的处理也可以在多个服务器上并行。也做到了负载均衡的目的,将数据均衡到堕胎服务器的多个分区中。
- 能保消息顺序传输。 支持离线数据处理和实时数据处理。
Scale out:
- 支持在线水平扩展,以支持更大数据处理量。
高可用机制:
- 其次为了保证高可用,每个分区都会有一定数量的副本(replica)。这样如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。
- 然后保证分区内部消息的消费有序性。
消费者组:
- Kafka还具有consumer group的概念,每个分区只能被同一个group的一个consumer消费,但可以被多个group消费。
而传统的消息队列,比如Rides:
redis只是提供一个高性能的、原子操作内存键值队,具有高速访问能力,可用做消息队列的存储,但是不具备消息队列的任何功能和逻辑,要作做为消息队列来实现的话,功能和逻辑要通过上层应用自己实现。
redis 消息推送(基于分布式 pub/sub)多用于实时性较高的消息推送,并不保证可靠。
作为消息队列来说,企业中选择mq的还是多数,因为像Rabbit,Rocket等mq中间件都属于很成熟的产品,性能一般但可靠性较强,而kafka原本设计的初衷是日志统计分析,现在基于大数据的背景下也可以做运营数据的分析统计,而redis的主要场景是内存数据库,作为消息队列来说可靠性太差,而且速度太依赖网络IO,在服务器本机上的速度较快,且容易出现数据堆积的问题,在比较轻量的场合下能够适用
我们还是以RabbitMQ为例介绍。它是用Erlang语言开发的开源的消息队列,支持多种协议,包括AMQP,XMPP, SMTP, STOMP。适合于企业级的开发。
MQ支持Broker构架,消息发送给客户端时需要在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
Kafka判断一个节点是否还活着有那两个条件?
节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接
如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久,注意,这里的同步,同步的是isr中的follower节点。
Kafa consumer是否可以消费指定分区消息?
Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的,但是这样必须使用kafka低级API去做,分区和offset的控制完全交给了应用程序;
producer是否直接将数据发送到broker的leader(主节点)?
producer直接将数据发送到topic的leader(主节点),不需要在多个节点进行分发,为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了。
Kafka存储在硬盘上的消息格式是什么?
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。
- 消息长度: 4 bytes (value: 1+4+n)
- 版本号: 1 byte
- CRC校验码: 4 bytes
- 具体的消息: n bytes
Kafka高效文件存储设计特点:
- Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用,健儿来说就是通过一种分而治之的方法,逐步清除没有用的小文件。
- 通过索引信息可以快速定位message和确定response的最大大小。
- 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
- 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
Kafka工作流程
- 消息分类按不同类别,分成不同的Topic,Topic⼜又拆分成多个partition,每个partition副本均衡分散到不同的服务器器(提高并发访问的能力)
- 消费者按顺序从partition中读取,不支持随机读取数据,但可通过改变保存到zookeeper中的offset位置实现从任意位置开始读取。
- 服务器消息定时清除(不管有没有消费)
- 每个partition还可以设置备份到其他服务器上的个数以保证数据的可⽤性。通过Leader,Follower方式。
- zookeeper保存kafka服务器和客户端的所有状态信息.(确保实际的客户端和服务器轻量级)
- 在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,⼀个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过⼀个consumer可以消费多个partitions中的消息。
- 如果所有的consumer都具有相同的group(也就是所有的消费者再同一个消费者组),这种情况和queue模式很像;消息将会在consumers之间负载均衡.
- 如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
- 持久性,当收到的消息时先buffer起来,等到了一定的阀值再写入磁盘文件,减少磁盘IO.在一定程度上依赖OS的文件系统(对文件系统本身优化几乎不可能),所以在这个过程中,如果断电的话,存在消息的丢失。
- 除了磁盘IO,还应考虑网络IO,批量对消息发送和接收,并对消息进行压缩。
- 在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这种模式有些优点,首先consumer端可以根据⾃己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.
- kafka无需记录消息是否接收成功,是否要重新发送等,所以kafka的producer是非常轻量级的,consumer端也只需要将fetch后的offset位置注册到zookeeper,所以也是非常轻量级的.
生产者向 Kafka 发送消息的执行流程介绍一下?
- 生产者要往 Kafka 发送消息时,需要创建 ProducerRecoder,代码如下:
ProducerRecord<String,String> record
= new ProducerRecoder<>("CostomerCountry","Precision Products","France");
try{
producer.send(record);
}catch(Exception e){
e.printStackTrace();
}
ProducerRecoder 对象会包含目标 topic,分区内容,以及指定的 key 和 value,
在发送 ProducerRecoder 时,生产者会先把键和值对象序列化成字节数组
,然后在网络上传输。生产者在将
消息
发送到某个 Topic ,需要经过拦截器、序列化器和分区器(Partitioner)。如果消息 ProducerRecord 没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
- 若没有指定分区,且消息的 key 不为空,则使用 murmur 的 Hash 算法(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区分配。
- 若没有指定分区,且消息的 key 也是空,则用轮询的方式选择一个分区。
分区选择好之后,会将消息添加到一个记录批次中,这个批次的所有消息都会被发送到相同的 Topic 和 partition 上。然后会有一个独立的线程负责把这些记录批次发送到相应的 broker 中。
broker 接收到 Msg 后,会作出一个响应。如果成功写入 Kafka 中,就返回一个RecordMetaData 对象,它包含
Topic
和Partition
信息,以及记录在分区的offset
。若写入失败,就返回一个错误异常,生产者在收到错误之后尝试重新发送消息,几次之后如果还失败,就返回错误信息。
Producer 发送的一条 message 中包含哪些信息?
消息由 可变长度
的 报头、可变长度
的 不透明密钥字节数组和 可变长度
的 不透明值字节数组 组成。
RecordBatch 是 Kafka 数据的存储单元,一个 RecordBatch 中包含多个 Record
(即我们通常说的一条消息)。
一个 RecordBatch 中可以包含多条消息,即上图中的 Record,而每条消息又可以包含多个 Header 信息,Header 是 Key-Value 形式的。
kafka 如何实现多线程的消费?
kafka 允许同组的多个 partition 被一个 consumer 消费,但不允许一个 partition 被同组的多个 consumer 消费。
实现多线程步骤如下:
- 生产者随机分区提交数据(自定义随机分区)。
- 消费者修改单线程模式为多线程,在消费方面得注意,得遍历所有分区,否则还是只消费了一个区。
如何保证Kafka的消息有序
Kafka只能保证一个partition中的消息被某个consumer消费时是顺序的,事实上,从Topic角度来说, 当有多个partition时,消息仍然不是全局有序的。
生产者:通过分区的 leader 副本负责数据以先进先出的顺序写入,来保证消息顺序性。
在这里也需要说一下,如果是一个消费者组中的多个消费者去消费一个分区的话,就不能保证消费数据时候的消费顺序,并且会产生并发安全问题。
消费者:同一个分区内的消息只能被一个 group 里的一个消费者消费,保证分区内消费有序。
kafka 每个 partition 中的消息在写入时都是有序的,消费时, 每个 partition 只能被每一个消费者组中的一个消费者消费,保证了消费时也是有序的。
整个 kafka 不保证有序。如果为了保证 kafka 全局有序,那么设置一个生产者,一个分区,一个消费者。
kafka 如何保证数据的不重复和不丢失?
exactly once 模式 精确传递一次。将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。
kafka 默认的模式是 at least once ,但这种模式可能会产生重复消费的问题,所以在业务逻辑必须做幂等设计。
使用 exactly Once + 幂等操作,可以保证数据不重复,不丢失。exactly Once可以做到重复消费,即消息不丢失,幂等性可以做到下游只使用一次,配合起来就做到了精确一致性;
kafka如何保证对应类型数据写入相同的分区
通过 消息键 和 分区器 来实现,分区器为键
生成一个 offset
,然后使用 offset 对主题分区进行取模,为消息选取分区,这样就可以保证包含同一个键的消息会被写到同一个分区上。
- 如果
ProducerRecord
没有指定分区,且消息的key 不为空
,则使用Hash 算法
(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区分配。 - 如果 ProducerRecord 没有指定分区,且消息的
key 也是空
,则用 轮询 的方式选择一个分区。
Kafka创建Topic时如何将分区放置到不同的Broker中
- 副本因子不能大于 Broker 的个数;
- 第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
- 其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
- 剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的。
Kafka新建的分区会在哪个目录下创建
在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。
当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。
如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。
但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?
答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。