栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > Java

全面了解 kafka 的使用和特性

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

全面了解 kafka 的使用和特性

全面了解 kafka 的使用和特性

文章目录
  • 全面了解 kafka 的使用和特性
    • 1. kafka概述
      • 1.1 Kafka是什么
      • 1.2 Kafka的特性
      • 1.3 Kafka场景应用
    • 2. 消息队列内部的实现原理
      • 2.1 为什么需要消息队列
        • 2.2 异步处理
        • 2.3 应用解耦
        • 2.4 流量削锋
        • 2.5 日志处理
        • 2.6 消息通讯
      • 2.2 消息队列通信的模式
        • 2.2.1 点对点模式
        • 2.2.2 发布订阅模式
    • 3. Kafka的架构原理
      • 3.1 基础架构
      • 3.2 名词解释
      • 3.3 工作流程
      • 3.4 存储机制
      • 3.5 生产者
        • 3.5.1. 分区策略
        • 3.5.2. 数据可靠性
          • 1. 副本数据同步策略
          • 2. ISR
          • 3. ack 应答机制
          • 4. 故障处理细节
        • 3.5.3. Kafka消息传递语义
        • 3.5.4. 幂等性
      • 3.6 消费者
        • 3.6.1. 消费方式
        • 3.6.2. 分区策略
        • 3.6.3. offset维护
        • 3.6.4. Rebalance机制
          • 1. Coordinator
          • 2. Coordinator生命周期
          • 3. Rebalance流程
          • 4. Generation机制
          • 5. Leader Consumer
          • 6. rebalance场景剖析
            • 1. 新成员加入组(member join)
            • 2. 组成员崩溃(member failure)
            • 3. 组成员主动离组(member leave group)
            • 4. 提交位移(member commit offset)
      • 3.7 延迟队列消费
      • 3.8 时间轮机制
        • 3.8.1 什么会有要设计时间轮?
        • 3.8.2 时间轮是什么?
        • 3.8.3 多层级的时间轮
    • 4. kafka为什么那么快
      • 4.1. 顺序写磁盘
      • 4.2. 零拷贝技术
        • 4.2.1 DMA
        • 4.2.2 传统消费者读取数据流程
        • 4.2.3 kafka sendfile技术 — 零拷贝
      • 4.3 Page Cache
      • 4.4 批处理
      • 4.5 数据压缩
    • 5. zookeeper在kafka中的作用
    • 6. kafka事务
    • 7. kafka安装部署
      • 7.1 安装JAVA JDK
      • 7.2 安装ZooKeeper
      • 7.3 安装Kafka
    • 8. Spring Boot整合Kafka
    • 9. 总结
      • 9.1 可以简述下Kafka架构中比较重要的关键字吗?比如Partition,Broker,你都是怎么理解的?
      • 9.2 那我们为什么要选择 Kafka 呢?
      • 9.3 那为什么Kafka的吞吐量远高于其他同类中间件?
      • 9.3 如何避免重复消费?
      • 9.4 如何延迟消费?
      • 9.5 频繁rebanlence怎么解决?


1. kafka概述

至于为什么叫kafka呢?是因为创作它的程序员叫做jay krep,他非常喜欢 弗兰兹·卡夫卡,觉的kafka这个名字很酷,所以就起了这个名字。名字没有什么特别的意思。

1.1 Kafka是什么

Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。

Kafka最初是由LinkedIn开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。

Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。

Kafka 是我工作多年使用最多的消息中间件 ,特点是拥有巨大吞吐量(数百万/秒),作为当下最流行的分布式,可水平扩展,可容错的“消息系统”。

1.2 Kafka的特性
  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写
1.3 Kafka场景应用
  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源

回到顶部


2. 消息队列内部的实现原理 2.1 为什么需要消息队列

消息系统的核心作用就是三点:解耦,异步和并行

在高并发场景下,由于服务端来不及同步处理数量过多的请求,可能导致请求堵塞。例如,大量的 INSERT、UPDATe 之类的请求同时到达 MySQL 服务端,在执行这些请求的过程中,会出现大量的行锁、表锁,甚至到最后,由于请求堆积过多,触发“too many connections”错误。在这类高并发场景下,通过使用消息队列,我们就可以异步处理这些请求,从而缓解系统的压力。

2.2 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式

1、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

2、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)

小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

2.3 应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图:

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作

假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.4 流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

  • 可以控制活动的人数
  • 可以缓解短时间内高流量压垮应用

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

秒杀业务根据消息队列中的请求信息,再做后续处理

2.5 日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下

  • 日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
  • 消息队列,负责日志数据的接收,存储和转发
  • 日志处理应用:订阅并消费kafka队列中的日志数据
2.6 消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等

点对点通讯:

客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:

客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考

回到顶部


2.2 消息队列通信的模式 2.2.1 点对点模式


点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。点对点模型的的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。

2.2.2 发布订阅模式


发布订阅模式是一个基于消息送的消息传送模型,该模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者(类似微信公众号)。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!但是consumer1、consumer2、consumer3由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是8M/s、5M/s、2M/s,如果队列推送的速度为5M/s,则consumer3无法承受!如果队列推送的速度为2M/s,则consumer1、consumer2会出现资源的极大浪费!

回到顶部


3. Kafka的架构原理

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力。

3.1 基础架构

从一个较高的层面上来看,producer通过网络发送消息到Kafka集群,然后consumer来进行消费,如下图:

3.2 名词解释
  • Producer: Producer即生产者,消息的产生者,是消息的入口。
  • Broker: Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
  • Topic: 消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
  • Partition: Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
    Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
  • Message: 每一条发送的消息主体。
  • Consumer: 消费者,即消息的消费方,是消息的出口。
  • Consumer Group: 我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
  • Zookeeper: kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
由上图可知,kafka依赖zookeeper。

3.3 工作流程


Kafka 中消息是以 topic 进行分类的, 生产者生产消息,消费者消费消息,都是面向 topic的。
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。 Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。 消费者组中的每个消费者, 都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

3.4 存储机制


由于生产者生产的消息会不断追加到 log 文件末尾, 为防止 log 文件过大导致数据定位效率低下, Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。 每个 segment对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下, 该文件夹的命名规则为: topic 名称+分区序号。例如, first 这个 topic 有三个分区,则其对应的文件夹为 first-0,first-1,first-2。
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

回到顶部


3.5 生产者 3.5.1. 分区策略
  • 分区的原因
    • 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition组成,因此整个集群就可以适应任意大小的数据了;
    • 可以提高并发,因为可以以 Partition 为单位读写了。
  • 分区的原则
    • 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
    • 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到
      partition 值;
    • 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与
      topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法。
3.5.2. 数据可靠性

为保证 producer 发送的数据,能可靠的发送到指定的 topic, topic 的每个 partition 收到producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到) ,如果producer 收到 ack, 就会进行下一轮的发送,否则重新发送数据。

1. 副本数据同步策略
方案优点缺点
半数以上完成同步,就发送ack延迟低选举新的leader时,容忍n台节点的故障,需要2n+1个副本
全部完成同步,才发送ack选举新的leader时,容忍n台节点的故障,需要n+1个副本延迟高

Kafka 选择了第二种方案,原因如下:
1.同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1个副本,而 Kafka 的每个分区都有大量的数据, 第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。

2. ISR

采用第二种方案之后,设想以下情景: leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后, leader 就会给 follower 发送 ack。如果 follower长时间未向 leader 同步数据 ,则该 follower 将被踢出 ISR , 该时间阈值由replica.lag.time.max.ms 参数设定。 Leader 发生故障之后,就会从 ISR 中选举新的 leader。

3. ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks 参数配置:
0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1: producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;
-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前, leader 发生故障,那么会造成数据重复。

4. 故障处理细节


LEO:指的是每个副本最后一个的 offset;
HW:指的是所有副本中最小的 LEO。
(1) follower 故障
follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后, follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
(2) leader 故障
leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性, 其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。

注意: 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

3.5.3. Kafka消息传递语义

at most once:最多一次。消息可能会丢失,但是不会重复。
at least once:最少一次。消息不会丢失,但是可能会重复,
exactly once:精确一次。消息只会被精确处理一次,不会丢失重复。

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。

At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的, At More Once可以保证数据不重复,但是不能保证数据不丢失。 但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。

在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:
At Least Once + 幂等性 = Exactly Once

3.5.4. 幂等性

要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对做缓存,当具有相同主键的消息提交时, Broker 只会持久化一条。但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

回到顶部


3.6 消费者 3.6.1. 消费方式

consumer 采用 pull(拉) 模式从 broker 中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中, 一直返回空数据。 针对这一点, Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费, consumer 会等待一段时间之后再返回,这段时长即为 timeout。

3.6.2. 分区策略

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。

Kafka 分配策略:RoundRobin、 Range、sticky。

比如我们消费的一个主题有12个分区:

p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11

假设我们的消费者组里面有三个消费者

range策略

  • range策略就是按照partiton的序号范围
  • p0~3 consumer1
  • p4~7 consumer2
  • p8~11 consumer3
  • 默认就是这个策略;

round-robin策略

  • 就是轮询分配
  • consumer1:0,3,6,9
  • consumer2:1,4,7,10
  • consumer3:2,5,8,11

但是前面的这两个方案有个问题:12 -> 2 每个消费者会消费6个分区

假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3,这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上。

sticky策略

最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略

  • consumer1:0-3
  • consumer2: 4-7
  • consumer3: 8-11

假设consumer3挂了

  • consumer1:0-3,+8,9
  • consumer2: 4-7,+10,11
3.6.3. offset维护

由于 consumer 在消费过程中可能会出现断电宕机等故障, consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

Kafka 0.9 版本之前, consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

3.6.4. Rebalance机制

消费组有多个消费者,消费组在消费一个Topic的时候,kafka为了保证消息消费不重不漏,kafka将每个partition唯一性地分配给了消费者。但是如果某个消费组在消费的途中有消费者宕机或者有新的消费者加入的时候那么partition分配就是不公平的,可能导致某些消费者负载特别重,某些消费者又没有负载的情况。Kafka有一种专门的机制处理这种情况,这种机制称为Rebalance机制。

当kafka遇到如下四种情况的时候,kafka会触发Rebalance机制:

  1. 消费组成员发生了变更,比如有新的消费者加入了消费组组或者有消费者宕机
  2. 消费者无法在指定的时间之内完成消息的消费
  3. 消费组订阅的Topic发生了变化
  4. 订阅的Topic的partition发生了变化
1. Coordinator

英[kəʊˈɔːdɪneɪtə] 美[koʊˈɔːrdɪneɪtər]

在介绍Rebalance机制之前,先介绍一下Coordinator,它是Rebalance机制中非常重要的一个角色。每个消费组都会有一个coordinator,Coordinator负责处理管理组内的消费者和位移管理,Coordinator并不负责消费组内的partition分配。消费者通过心跳的方式告知Coordinator自己仍然处于存活状态,Coordinator以session. timeout. ms参数的频率检测消费组group内消费者存活情况,该参数的默认值是10s,如果该值太大,那么coordinator需要非常长时间才能检测到消费者宕机。

consumer如何向coordinator证明自己还活着? 通过定时向coordinator发送Heartbeat请求。如果超过了设定的超时时间,那么coordinator就认为这个consumer已经挂了。一旦coordinator认为某个consumer挂了,那么它就会开启新一轮rebalance,并且在当前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告诉其他consumer:不好意思各位,你们重新申请加入组吧!

选举机制
如果kafka集群有多个broker节点,消费组会选择哪个partition节点作为Coordinator节点呢?它会通过如下公式,其中的50代表着kafka内部主题consumer offset的分区总数

Math.abs(hash(groupID)) % 50

那么当前Consumer Group的Coordinator就是上述公式计算出的partition的leader partition

2. Coordinator生命周期

Coordinator生命周期中总共有5种状态,Down,Initialize,Stable,Joining,AwaitingSync

Down:Coordinator不会维护任何消费组状态

Initialize:Coordinator处于初始化状态,Coordinator从Zookeeper中读取相关的消费组数据,这个时候Coordinator对接受到消费者心跳或者加入组的请求都会返回错误

Stable:Coordinator处理消费者心跳请求,但是还未开始初始化generation,Coordinator正在等待消费者加入组的请求

Joining:Coordinator正在处理组内成员加入组的请求

AwaitingSync:等待leader consumer分配分区,并将分区分配结果发送给各个Consumer

3. Rebalance流程

Rebalance的前提是Coordinator已经确定了。

Coordinator发生Rebalance的时候,Coordinator并不会主动通知组内的所有Consumer重新加入组,而是当Consumer向Coordinator发送心跳的时候,Coordinator将Rebalance的状况通过心跳响应告知Consumer。Rebalance机制整体可以分为两个步骤,一个是Joining the Group,另外一个是分配Synchronizing Group State

Joining the Group

在当前这个步骤中,所有的消费者会和Coordinator交互,请求Coordinator加入当前消费组。Coordinator会从所有的消费者中选择一个消费者作为leader consumer, 选择的算法是随机选择,并把组成员信息以及订阅信息发给leader。

rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题:

  • Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着
  • LeaveGroup请求:主动告诉coordinator我要离开consumer group
  • SyncGroup请求:group leader把分配方案告诉组内所有成员
  • JoinGroup请求:成员请求加入组
  • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

Coordinator在rebalance的时候主要用到了前面4种请求。

Synchronizing Group State

这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

leader Consumer从Coordinator获取所有的消费者的信息,并将消费组订阅的partition分配结果封装为SyncGroup请求,需要注意的是leader Consumer不会直接与组内其它的消费者交互,leader Consumer会将SyncGroup发送给Coordinator,Coordinator再将分配结果发送给各个Consumer。分配partition有如下3种策略RangeAssignor,RoundRobinAssignor,StickyAssignor。

4. Generation机制

在上文中提到消费者消费消息超时之后,如果再次尝试提交offset,就会出现如下的异常

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 

出现该异常的原因是Coordinator消费组的保护机制。上文提到如果消费者消费超时,笔者称其为TimeoutConsumer,那么TimeoutConsumer就会被Coordinator从消费组中剔除,Coordinator就会进行Rebalance,将当前消费者负责的partition重新分配给其它的消费者,如果TimeoutConsumer完成了消息的消费,假设TimeoutConsumer成功提交partition的offset,那么就会出现混乱,因为TimeoutConsumer负责的partition已经被分配给了其它的消费者。Generation(代际)机制就是上述的保护机制。

Coordinator每进行一次Rebalance,就会为当前的Rebalance设置一个Generation标记,比如说第一次Rebalance标记是1,如果再次Rebalance,该标记就会成为2,消费者在提交offset的时候会将generation一同提交,Coordinator在发现TimeoutConsumer的标记已经超时的情况下会拒绝消费者提交generation标记。

Generation的机制可能会导致上一代际消费者和当前代际消费者消费相同的消息,所以消费者在消费消息的时候需要实现消息消费的幂等性。

5. Leader Consumer

上文提到Leader Consumer是Coordinator在Joining the Group步骤的时候随机选择的,Leader Consumer负责组内各个Consumer的partition分配,除此之外Leader Consumer还负责整个消费组订阅的主题的监控,Leader Consumer会定期更新消费组订阅的主题信息,一旦发现主题信息发生了变化,Leader Consumer会通知Coordinator触发Rebalance机制。

6. rebalance场景剖析 1. 新成员加入组(member join)

2. 组成员崩溃(member failure)

组成员崩溃和组成员主动离开是两个不同的场景。因为在崩溃时成员并不会主动地告知coordinator此事,coordinator有可能需要一个完整的session.timeout周期才能检测到这种崩溃,这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance;而崩溃则是被动地发起rebalance。okay,直接上图:

3. 组成员主动离组(member leave group)

4. 提交位移(member commit offset)

回到顶部


3.7 延迟队列消费

先看一下kafka里面哪些地方需要有任务要进行延迟调度。

第一类延时的任务:

比如说producer的acks=-1,必须等待leader和follower都写完才能返回响应。

有一个超时时间,默认是30秒(request.timeout.ms)。

所以需要在写入一条数据到leader磁盘之后,就必须有一个延时任务,到期时间是30秒延时任务 放到DelayedOperationPurgatory(延时管理器)中。

假如在30秒之前如果所有follower都写入副本到本地磁盘了,那么这个任务就会被自动触发苏醒,就可以返回响应结果给客户端了,否则的话,这个延时任务自己指定了最多是30秒到期,如果到了超时时间都没等到,就直接超时返回异常。

第二类延时的任务:

follower往leader拉取消息的时候,如果发现是空的,此时会创建一个延时拉取任务

延时时间到了之后(比如到了100ms),就给follower返回一个空的数据,然后follower再次发送请求读取消息,但是如果延时的过程中(还没到100ms),leader写入了消息,这个任务就会自动苏醒,自动执行拉取任务。

海量的延时任务,需要去调度。

3.8 时间轮机制 3.8.1 什么会有要设计时间轮?

Kafka内部有很多延时任务,没有基于JDK Timer来实现,那个插入和删除任务的时间复杂度是O(nlogn),而是基于了自己写的时间轮来实现的,时间复杂度是O(1),依靠时间轮机制,延时任务插入和删除,O(1)

3.8.2 时间轮是什么?

其实时间轮说白其实就是一个数组。

  • tickMs:时间轮间隔 1ms
  • wheelSize:时间轮大小 20
  • interval:timckMS * whellSize,一个时间轮的总的时间跨度。20ms
  • currentTime:当时时间的指针。
  • a:因为时间轮是一个数组,所以要获取里面数据的时候,靠的是index,时间复杂度是O(1)
  • b:数组某个位置上对应的任务,用的是双向链表存储的,往双向链表里面插入,删除任务,时间复杂度也是O(1)
3.8.3 多层级的时间轮

比如:要插入一个110毫秒以后运行的任务。

  • tickMs:时间轮间隔 20ms
  • wheelSize:时间轮大小 20
  • interval:timckMS * whellSize,一个时间轮的总的时间跨度。20ms
  • currentTime:当时时间的指针。

第一层时间轮:1ms * 20
第二层时间轮:20ms * 20
第三层时间轮:400ms * 20

回到顶部


4. kafka为什么那么快 4.1. 顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。 官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

4.2. 零拷贝技术 4.2.1 DMA

DMA,全称叫Direct Memory Access,一种可让某些硬件子系统去直接访问系统主内存,而不用依赖CPU的计算机系统的功能。听着是不是很厉害,跳过CPU,直接访问主内存。传统的内存访问都需要通过CPU的调度来完成。而DMA,则可以绕过CPU,硬件自己去直接访问系统主内存。

很多硬件都支持DMA,这其中就包括网卡。

4.2.2 传统消费者读取数据流程
  • 消费者发送请求给kafka服务
  • kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)
  • 从磁盘读取了数据到os cache缓存中
  • os cache复制数据到kafka应用程序中
  • kafka将数据(复制)发送到socket cache中
  • socket cache通过网卡传输给消费者


从上图中可以看出,从数据读取到发送一共经历了四次数据拷贝,具体流程如下:

第一次数据拷贝: 当用户进程发起 read() 调用后,上下文从用户态切换至内核态。DMA 引擎从文件中读取数据,并存储到 Page Cache (内核态缓冲区)。

第二次数据拷贝: 请求的数据从内核态缓冲区拷贝到用户态缓冲区,然后返回给用户进程。同时会导致上下文从内核态再次切换到用户态。

第三次数据拷贝: 用户进程调用 send() 方法期望将数据发送到网络中,此时用户态会再次切换到内核态,请求的数据从用户态缓冲区被拷贝到 Socket 缓冲区。

第四次数据拷贝: send() 系统调用结束返回给用户进程,再次发生上下文切换。此次操作会异步执行,从 Socket 缓冲区拷贝到协议引擎中。

4.2.3 kafka sendfile技术 — 零拷贝
  • 消费者发送请求给kafka服务
  • kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)
  • 从磁盘读取了数据到os cache缓存中
  • os cache直接将数据发送给网卡
  • 通过网卡将数据传输给消费者

    从上图中可以看出,从数据读取到发送一共经历了三次数据拷贝,减少了一次,具体流程如下:

用户进程调用 FileChannel#transferTo(),上下文从用户态切换至内核态。

第一次数据拷贝:DMA 从文件中读取数据,并存储到 Page Cache。

第二次数据拷贝:CPU 将 Page Cache 中的数据拷贝到 Socket 缓冲区。

第三次数据拷贝:DMA 将 Socket 缓冲区数据拷贝到网卡进行数据传输。

Kafka 在这里采用的方案是通过 NIO 的 transferTo/transferFrom 调用操作系统的 sendfile 实现零拷贝。总共发生 2 次内核数据拷贝、2 次上下文切换和一次系统调用,消除了 一次 CPU 数据拷贝

4.3 Page Cache

为什么需要 Page Cache?

充当缓存的作用,这样就可以实现文件数据的预读,提升 I/O 的性能。 可以理解为:批量数据刷盘。

“引入 Cache 层的目的是为了提高 Linux 操作系统对磁盘访问的性能。Cache 层在内存中缓存了磁盘上的部分数据。当数据的请求到达时,如果在 Cache 中存在该数据且是最新的,则直接将数据传递给用户程序,免除了对底层磁盘的操作,提高了性能。Cache 层也正是磁盘 IOPS 为什么能突破 200 的主要原因之一。在 Linux 的实现中,文件 Cache 分为两个层面,一是 Page Cache,另一个 Buffer Cache,每一个 Page Cache 包含若干 Buffer Cache。Page Cache 主要用来作为文件系统上的文件数据的缓存来用,尤其是针对当进程对文件有 read/write 操作的时候。Buffer Cache 则主要是设计用来在系统对块设备进行读写的时候,对块进行数据缓存的系统来使用。

使用 Page Cache 的好处:

  • I/O Scheduler 会将连续的小块写组装成大块的物理写从而提高性能
  • I/O Scheduler 会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
  • 充分利用所有空闲内存(非 JVM 内存)。如果使用应用层 Cache(即 JVM 堆内存),会增加 GC 负担 读操作可直接在 Page Cache 内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过 Page Cache)交换数据
  • 如果进程重启,JVM 内的 Cache 会失效,但 Page Cache 仍然可用

Broker 收到数据后,写磁盘时只是将数据写入 Page Cache,并不保证数据一定完全写入磁盘。从这一点看,可能会造成机器宕机时,Page Cache 内的数据未写入磁盘从而造成数据丢失。但是这种丢失只发生在机器断电等造成操作系统不工作的场景,而这种场景完全可以由 Kafka 层面的 Replication 机制去解决。如果为了保证这种情况下数据不丢失而强制将 Page Cache 中的数据 Flush 到磁盘,反而会降低性能。也正因如此,Kafka 虽然提供了 flush.messages 和 flush.ms 两个参数将 Page Cache 中的数据强制 Flush 到磁盘,但是 Kafka 并不建议使用。

4.4 批处理

在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络IO。

因此,除了操作系统提供的低级批处理之外,Kafka 的客户端和 broker 还会在通过网络发送数据之前,在一个批处理中累积多条记录 (包括读和写)。记录的批处理分摊了网络往返的开销,使用了更大的数据包从而提高了带宽利用率。

kafka可以将数据记录分批发送,从生产者到文件系统(kafka主题日志)到消费者,可以端到端的查看这些批次的数据。

4.5 数据压缩

Producer 可将数据压缩后发送给 broker,从而减少网络传输代价,目前支持的压缩算法有:Snappy、Gzip、LZ4。数据压缩一般都是和批处理配套使用来作为优化手段的。

Producer 使用压缩算法压缩消息后发送给服务器后,由consumer消费者进行解压。

回到顶部


5. zookeeper在kafka中的作用

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。

以下为 partition 的 leader 选举过程:

6. kafka事务

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败.

  1. Producer 事务
    为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 TransactionID 获得原来的 PID。为了管理 Transaction, Kafka 引入了一个新的组件 Transaction Coordinator。 Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。 Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

  2. Consumer 事务
    上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

回到顶部


7. kafka安装部署 7.1 安装JAVA JDK

1、下载JDK安装包
注意:根据32/64位操作系统下载对应的安装包

2、添加系统变量:JAVA_HOME=C:Program Files (x86)Javajdk1.8.0_14

7.2 安装ZooKeeper
  1. 下载安装包Apache ZooKeeper

  2. 解压并进入ZooKeeper目录,如:D:Kafkazookeeper-3.4.9conf, 将“zoo_sample.cfg”重命名为“zoo.cfg”,

  3. 打开“zoo.cfg”找到并编辑数据存放位置 dataDir=F:HCapache-zookeeper-3.7.0data(必须以分割)

  4. 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)

  5. 添加系统变量:ZOOKEEPER_HOME=D:Kafkazookeeper-3.4.9

  6. 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%bin

  7. 打开新的cmd,输入“zkServer“,运行Zookeeper

命令行提示如下:说明本地Zookeeper启动成功


注意:不要关了这个服务窗口

7.3 安装Kafka

1、 下载安装包
Apache Kafka

注意要下载二进制版本

2、 解压并进入Kafka目录,笔者:D:Kafkakafka_2.12-0.11.0.0

3、 进入config目录找到文件server.properties并打开

4、 找到并编辑日志保存的位置 log.dirs=F:HCkafka_2.13-2.8.1kafka-logs

5、 找到并编辑zookeeper.connect=localhost:2181

6、 Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181

7、 进入Kafka安装目录D:Kafkakafka_2.12-0.11.0.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:

.binwindowskafka-server-start.bat .configserver.properties
或binkafka-server-start.sh configserver.properties


注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行

回到顶部


8. Spring Boot整合Kafka

在pom.xml中添加以下依赖。


    org.apache.kafka
    kafka-clients
    2.4.0


    org.slf4j
    slf4j-log4j12
    1.7.6

修改配置文件kafka.properties

server:
  port: 8080

spring:
	kafka:
	    #bootstrap-servers: server1:9092,server2:9093 #kafka开发地址,
	    #生产者配置
	    producer:
	      # Kafka提供的序列化和反序列化类
	      key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
	      value-serializer: org.apache.kafka.common.serialization.StringSerializer
	      retries: 1 # 消息发送重试次数
	      #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果
	      #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。
	      #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量
	      acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
	      batch-size: 16384 #批量大小
	      properties:
	        linger:
	          ms: 0 #提交延迟
	      buffer-memory: 33554432 # 生产端缓冲区大小
	    # 消费者配置
	    consumer:
	      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
	      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
	      # 分组名称
	      group-id: web
	      enable-auto-commit: false
	      #提交offset延时(接收到消息后多久提交offset)
	      # auto-commit-interval: 1000ms
	      #当kafka中没有初始offset或offset超出范围时将自动重置offset
	      # earliest:重置为分区中最小的offset;
	      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
	      # none:只要有一个分区不存在已提交的offset,就抛出异常;
	      auto-offset-reset: latest
	      properties:
	        #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
	        session.timeout.ms: 15000
	        #消费请求超时时间
	        request.timeout.ms: 18000
	      #批量消费每次最多消费多少条消息
	      #每次拉取一条,一条条消费,当然是具体业务状况设置
	      max-poll-records: 1
	      # 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒;
	      heartbeat-interval: 6000
	      # 发出请求时传递给服务器的 ID。用于服务器端日志记录 正常使用后解开注释,不然只有一个节点会报错
	      #client-id: mqtt
	    listener:
	      #消费端监听的topic不存在时,项目启动会报错(关掉)
	      missing-topics-fatal: false
	      #设置消费类型 批量消费 batch,单条消费:single
	      type: single
	      #指定容器的线程数,提高并发量
	      #concurrency: 3
	      #手动提交偏移量 manual达到一定数据后批量提交
	      #ack-mode: manual
	      ack-mode: MANUAL_IMMEDIATE #手動確認消息
	        # 认证
	    #properties:
	      #security:
	        #protocol: SASL_PLAINTEXT
	      #sasl:
	        #mechanism: SCRAM-SHA-256
	        #jaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'

发送消息

KafkaProducerDemo.java发送消息

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
//如果是SSL接入点实例或者SASL接入点实例,请注释以下第一行代码。
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;


public class KafkaProducerDemo {

    public static void main(String args[]) {
          
       
         
       
       
       

        //加载kafka.properties。
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
         
       

       

       

        //消息队列Kafka版消息的序列化方式。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //请求的最长等待时间。
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //设置客户端内部重试次数。
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //设置客户端内部重试间隔。
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
         
       

        //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可。
        //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个。
        KafkaProducer producer = new KafkaProducer(props);

        //构造一个消息队列Kafka版消息。
        String topic = kafkaProperties.getProperty("topic"); //消息所属的Topic,请在控制台申请之后,填写在这里。
        String value = "this is the message's value"; //消息的内容。

        try {
            //批量获取Future对象可以加快速度,但注意,批量不要太大。
            List> futures = new ArrayList>(128);
            for (int i =0; i < 100; i++) {
                //发送消息,并获得一个Future对象。
                ProducerRecord kafkaMessage =  new ProducerRecord(topic, value + ": " + i);
                Future metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);

            }
            producer.flush();
            for (Future future: futures) {
                //同步获得Future对象的结果。
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            //客户端内部重试之后,仍然发送失败,业务要应对此类错误。
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
}

订阅消息

  • 单Consumer订阅消息:编译并运行KafkaConsumerDemo.java发送消息。
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;


public class KafkaConsumerDemo {

    public static void main(String args[]) {

        //设置JAAS配置文件的路径。
        
                        
        
                        
        

        //加载kafka.properties
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //如果是SSL接入点实例,请注释以下第一行代码。
        //可更加实际拉去数据和客户的版本等设置此值,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        

        //如果是SASL接入点PLAIN机制实例,请注释以下一行代码。
       //可更加实际拉去数据和客户的版本等设置此值,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        

        //如果是SASL接入点SCRAM机制实例,请注释以下一行代码。
       //可更加实际拉去数据和客户的版本等设置此值,默认30s
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        

        //每次poll的最大数量。
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写。
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
        
        //如果是SSL接入点实例,请取消注释以下一行代码。
        //Hostname校验改成空。
        //props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        //构造消息对象,也即生成一个消费实例。
        KafkaConsumer consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);
        //设置消费组订阅的Topic,可以订阅多个。
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
        List subscribedTopics =  new ArrayList();
        
        //如果是SSL接入点实例,请注释以下前五行代码,取消注释第六行代码。
        //如果需要订阅多个Topic,则在这里add进去即可。
        //每个Topic需要先在控制台进行创建。
        String topicStr = kafkaProperties.getProperty("topic");
        String[] topics = topicStr.split(",");
        for (String topic: topics) {
            subscribedTopics.add(topic.trim());
        }
        //subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);

        //循环消费消息。
        while (true){
            try {
                ConsumerRecords records = consumer.poll(1000);
                //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
                //建议开一个单独的线程池来消费消息,然后异步返回结果。
                for (ConsumerRecord record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
                //参考常见报错: 使用消息队列Kafka版时客户端的报错及解决方案
                e.printStackTrace();
            }
        }
    }
}
  • 多Consumer订阅消息:编译并运行KafkaMultiConsumerDemo.java消费消息。
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
//如果是SSL接入点实例或者SASL接入点实例,请取消注释以下第一行代码。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.errors.WakeupException;


public class KafkaMultiConsumerDemo {

    public static void main(String args[]) throws InterruptedException {
        
        //设置JAAS配置文件的路径。
        
                            
        
                            
        


        //加载kafka.properties。
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        
        
        
        

        

        //两次Poll之间的最大允许间隔。
        //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //每次Poll的最大数量。
        //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式。
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写。
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        

        int consumerNum = 2;
        Thread[] consumerThreads = new Thread[consumerNum];
        for (int i = 0; i < consumerNum; i++) {
            KafkaConsumer consumer = new KafkaConsumer(props);

            List subscribedTopics = new ArrayList();
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);

            KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
            consumerThreads[i] = new Thread(kafkaConsumerRunner);
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].start();
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].join();
        }
    }

    static class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {
            try {
                while (!closed.get()) {
                    try {
                        ConsumerRecords records = consumer.poll(1000);
                        //必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
                        for (ConsumerRecord record : records) {
                            System.out.println(String.format("Thread:%s Consume partition:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
                        }
                    } catch (Exception e) {
                        try {
                            Thread.sleep(1000);
                        } catch (Throwable ignore) {

                        }
                        e.printStackTrace();
                    }
                }
            } catch (WakeupException e) {
                //如果关闭则忽略异常。
                if (!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }
        //可以被另一个线程调用的关闭Hook。
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}

回到顶部


9. 总结

回到顶部


9.1 可以简述下Kafka架构中比较重要的关键字吗?比如Partition,Broker,你都是怎么理解的?

关于Kafka我做了一些深入了解,它的设计思路还是很值得借鉴的,这其中有6个比较关键的名字概念,弄懂这几个概念才能更好地了解Kafka的工作机制。

  1. Producer
    消息的生产方,如支付系统确认用户已经支付,支付系统要通知订单系统和物流系统,支付系统就是生产者。

  2. Consumer
    消费的接收方,Producer 的案例中,物流系统就是消费方,前两个都比较简单,我就不多说了。

  3. Topic
    每条发布到MQ集群的消息都有一个类别,这个类别被称为topic,可以理解成一类消息的名字。所有的消息都已topic作为单位进行归类。

  4. Partition
    Kafka 物理上分区的概念,每个 Topic 会分散在一个或多个 Partition。一个 Topic 的数据太大了,就分成小片,Kafka 为分区引入多副本模型,副本之间采用“一个leader多follower”的设计,通过多副本实现故障自动转移,保证可用性。

  5. Broker:
    可以理解成一个服务器的节点,集群包含一个或多个服务器,这种服务器被称为 broker。对应用来说,生产者把消费发出去了,就不管了。消费者慢条斯理地按照自己的速率来消费。这段时间可能有大量消息产生,消费者压力还是在一定范围内。做生产者和消费者之间解耦的就是一个缓存服务broker。

  6. Kafka Cluster
    集群就是 Broker 的集合,多个 Broker 组成一个高可用集群。

Producer 与 Consumer的关系

topic 和 Partition 的关系
一个 topic 可以分别存储到多个 Partition,每个 Partition 有序的。

9.2 那我们为什么要选择 Kafka 呢?

Kafka 特有的功能:

相比同类中间件 RabbitMQ or ActiveMQ,Kafka 支持批量拉取消息,大大增加了Kafka的消息吞吐量。

支持多种发送场景:

  1. 发送并忘记。
  2. 同步发送 。
  3. 异步发送+回调函数。

3种方式虽然在时间上有所差别,但并不是说时间越快的越好,具体使用哪种方式要看具体的业务场景,比如业务要求消息必须是按顺序发送,可以使用第2种同步发送,并且只能在一个partation上。如果业务只关心消息的吞吐量,容许少量消息发送失败,也不关注消息的发送顺序,那么可以使用发送并忘记的方式。如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息

分布式可高可扩展。Kafka 集群可以透明的扩展,增加新的服务器进集群。

只说了 Kafka 的优势,那别的同类产品就不好了吗?当然不是,存在即真理,每个产品能生存下来,一定有它自己的优势,比如 RabbitMQ,在吞吐量方面稍逊于 Kafka ,但是他们的出发点不一样,RabbitMQ 支持对消息的可靠的传递,支持事务,不支持批量的操作,技术选型中,选择最适合你的,你最了解熟悉的。

9.3 那为什么Kafka的吞吐量远高于其他同类中间件?

Kafka 是一个高吞吐量分布式消息系统,并且提供了持久化。其高性能的有两个重要特点:

  1. 利用了磁盘连续读写性能远远高于随机读写的特点,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高。
  2. 并发,将一个topic拆分多个partition,kafka读写的单位是partition,因此,将一个topic拆分为多个partition可以提高吞吐量。但是,这里有个前提,就是不同partition需要位于不同的磁盘(可以在同一个机器)。如果多个partition位于同一个磁盘,那么意味着有多个进程同时对一个磁盘的多个文件进行读写,使得操作系统会对磁盘读写进行频繁调度,也就是破坏了磁盘读写的连续性。在linkedlin的测试中,每台机器就加载了6个磁盘,并且不做raid,就是为了充分利用多磁盘并发读写,又保证每个磁盘连续读写的特性。

    同一个topic会被分散到多个分片上,并行处理。
9.3 如何避免重复消费?

分析原因:

  1. 生产者重复提交
  2. rebalence引起重复消费
    超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance,提交offset失败。其他消费者会从没有提交的位置消费,从而导致重复消费。

解决方案:

  1. 提高消费速度

    • 增加消费者
    • 多线程消费
    • 异步消费
    • 调整消费处理时间
  2. 幂等处理

    • 消费者设置幂等校验
    • 开启kafka幂等配置,生产者开启幂等配置,将消息生成md5,然后保存到redis中,处理新消息的时候先校验。这个尽量不要开启,消耗性能。
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
9.4 如何延迟消费?

kafka是无状态的,没有延迟的功能。pulsar和rabbitmq实现更加方便。


开发延迟推送服务,定时检索延迟消息,发送给kafka。

9.5 频繁rebanlence怎么解决?

问题原因

  • v0.10.2之前版本的客户端:Consumer没有独立线程维持心跳,而是把心跳维持与poll接口耦合在一起。其结果就是,如果用户消费出现卡顿,就会导致Consumer心跳超时,引发Rebalance。
  • v0.10.2及之后版本的客户端:如果消费时间过慢,超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance。

触发时机:

  1. consumer个数变化
  2. 订阅topic个数变化
  3. 订阅的topic的partition变化

频繁出现rebalence,可能是消费者的消费时间过长,超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance。

解决方案:

  • 参数调整:
    • session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
    • max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> *的积。
    • max.poll.interval.ms: 该值要大于 / (<单个线程每秒消费的条数> *<消费线程的个数>)的值。
  • 尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
  • 减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic。

批量消费代码

import com.ctrip.framework.apollo.ConfigService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class BehaviorConsumerConfig {

    public Map consumerConfigs() {
        Map propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
            propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.StickyAssignor.class);

        propsMap.put("security.protocol", protocol);
        propsMap.put("ssl.truststore.location", truststoreLocation.replaceAll("file://", ""));
        propsMap.put("ssl.truststore.password", truststorePassword);
        propsMap.put("login.config.location", loginConfigLocation);
        propsMap.put("sasl.mechanism", mechanism);
        return propsMap;
    }

    @Bean("batchContainerFactory")
    KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));

        // 并发创建的消费者数量
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(3000);

        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }
}

回到顶部


转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1041011.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号