说说 MQ 之 Kafka ( 三 )

2018 年 10 月 31 日 ImportNew

(点击上方公众号,可快速关注)


来源:Valleylord ,

valleylord.github.io/post/201607-mq-kafka/


Kafka 副本和集群


在生产环境中,Kafka 总是以“集群+分区”方式运行的,以保证可靠性和性能。下面是一个3副本的 Kafka 集群实例。


首先,需要启动3个 Kafka Broker,Broker 的配置文件分别如下,


broker.id=0

listeners=PLAINTEXT://192.168.232.23:9092

log.dirs=/tmp/kafka-logs

broker.id=1

listeners=PLAINTEXT://192.168.232.23:9093

log.dirs=/tmp/kafka-logs-1

broker.id=1

listeners=PLAINTEXT://192.168.232.23:9094

log.dirs=/tmp/kafka-logs-2


虽然每个 Broker 只配置了一个端口,实际上,Kafka 会多占用一个,可能是用来 Broker 之间的复制的。另外,3个 Broker 都配置了,


zookeeper.connect=localhost:2181

delete.topic.enable=true


在同一个 Zookeeper 上的 Broker 会被归类到一个集群中。注意,这些配置中并没有指定哪一个 Broker 是主节点,哪些 Broker 是从节点,Kafka 采用的办法是从可选的 Broker 中,选出每个分区的 Leader。也就是说,对某个 Topic 来说,可能0节点是 Leader,另外一些 Topic,可能1节点是 Leader;甚至,如果 topic1 有2个分区的话,分区1的 Leader 是0节点,分区2的 Leader 是1节点。


这种对等的设计,对于故障恢复是十分有用的,在节点崩溃的时候,Kafka 会自动选举出可用的从节点,将其升级为主节点。在崩溃的节点恢复,加入集群之后,Kafka 又会将这个节点加入到可用节点,并自动选举出新的主节点。


实验如下,先新建一个3副本,2分区的 Topic,


bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1


初始状况下,topic1 的状态如下,


$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1

Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:

        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2

        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0


对于上面的输出,即使没有文档,也可以看懂大概:topic1 有2个分区,Partition 0 和 Partition 1,Leader 分别在 Broker 0 和 1。Replicas 表示副本在哪些 Broker 上,Isr(In-Sync Replicas)表示处于同步状态中的 Broker,如果有 Broker 宕机了,那么 Replicas 不会变,但是 Isr 会仅显示没有宕机的 Broker,详见下面的实验。


然后分2个线程,运行之前写的 Producer 和 Consumer 的示例代码,Producer 采用异步发送,消息采用同步复制。在有消息传送的情况下,kill -9 停掉其中2个 Broker(Broker 0 和 Broker 1),模拟突然宕机。此时,topic1 状态如下,


$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1

Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:

        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2

        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2


可见,Kafka 已经选出了新的 Leader,消息传送没有中断。接着再启动被停掉的那两个 Broker,并查看 topic1 的状态,如下,


$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1

Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:

        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0

        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2,1,0

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1

Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:

        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0

        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 2,1,0


可以发现, 有一个短暂的时间,topic1 的两个分区的 Leader 都是 Broker 2,但是在 Kafka 重新选举之后,分区1的 Leader 变为 Broker 1。说明 Kafka 倾向于用不同的 Broker 做分区的 Leader,这样更能达到负载均衡的效果。


再来看看 Producer 和 Consumer 的日志,下面这个片段是2个 Broker 宕机前后的日志,


......

Send     message: (00439, Message_00439) at offset 217 to partition(0) in 3 ms

Received message: (00438, Message_00438) at offset 216

Send     message: (00440, Message_00440) at offset 218 to partition(0) in 5 ms

Send     message: (00441, Message_00441) at offset 221 to partition(1) in 5 ms

Received message: (00441, Message_00441) at offset 221

Received message: (00439, Message_00439) at offset 217

Send     message: (00442, Message_00442) at offset 222 to partition(1) in 5 ms

Send     message: (00443, Message_00443) at offset 219 to partition(0) in 3 ms

Received message: (00440, Message_00440) at offset 218

Received message: (00443, Message_00443) at offset 219

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

Received message: (00442, Message_00442) at offset 222

Send     message: (00452, Message_00452) at offset 223 to partition(1) in 7492 ms

Send     message: (00454, Message_00454) at offset 224 to partition(1) in 7485 ms

Send     message: (00455, Message_00455) at offset 225 to partition(1) in 7482 ms

Send     message: (00458, Message_00458) at offset 226 to partition(1) in 7473 ms

Send     message: (00460, Message_00460) at offset 227 to partition(1) in 7467 ms

Send     message: (00461, Message_00461) at offset 228 to partition(1) in 7465 ms

Send     message: (00462, Message_00462) at offset 229 to partition(1) in 7462 ms

Send     message: (00463, Message_00463) at offset 230 to partition(1) in 7459 ms

Send     message: (00464, Message_00464) at offset 231 to partition(1) in 7456 ms

Send     message: (00465, Message_00465) at offset 232 to partition(1) in 7453 ms

......

Send     message: (01103, Message_01103) at offset 543 to partition(1) in 5478 ms

Received message: (00631, Message_00631) at offset 310

Received message: (00633, Message_00633) at offset 311

Send     message: (00451, Message_00451) at offset 220 to partition(0) in 7525 ms

Received message: (00634, Message_00634) at offset 312

Send     message: (00453, Message_00453) at offset 221 to partition(0) in 7518 ms

Received message: (00639, Message_00639) at offset 313

Send     message: (00456, Message_00456) at offset 222 to partition(0) in 7509 ms

Received message: (00641, Message_00641) at offset 314

Send     message: (00457, Message_00457) at offset 223 to partition(0) in 7506 ms

Received message: (00643, Message_00643) at offset 315

......


出现错误的时候,Producer 抛出了 NetworkException 异常。其中有3589条 Received 日志,3583条 Send 日志,7条 NetworkException 异常日志,发送消息的最大序号是3590,接收消息的最大序号是3589,有以下几个值得注意的地方,


  1. 宕机之前,消息的接收并不是顺序的,这是因为 topic1 有2个分区,Kafka 只保证分区上的有序;

  2. 宕机之后,出现了长段的发送日志而没有接收日志,说明 Kafka 此时正在选举,选举的过程会阻塞消费者;

  3. 从接收消息的条数和序号来看,所有的消息都收到了,没有丢(没有收到3590的消息可能是因为强制退出 client 进程的原因),发送的过程的7个异常应该只是虚警,7条异常对应序号444~450,3583条 Send 消息再加上这7条,与总消息3590条一致;


从这个实验中,可以看到,虽然 Kafka 不保证消息重复发送,但是却在尽量保证没有消息被重复发送,可能我的实验场景还不够极端,没有做出消息重复的情况。


如之前所说,如果要保持完全顺序性,需要使用单分区;如果要避免抛出 NetworkException 异常,就使用 Producer 同步发送。下面,我们重做上面的例子,不同之处是使用单分区和 Producer 同步发送,截取一段 Broker 宕机时的日志如下,


......

Sent message: (118, Message_00118)

Received message: (00118, Message_00118) at offset 117

Received message: (00119, Message_00119) at offset 118

Sent message: (119, Message_00119)

Sent message: (120, Message_00120)

Received message: (00120, Message_00120) at offset 119

Sent message: (121, Message_00121)

Received message: (00121, Message_00121) at offset 120

Sent message: (122, Message_00122)

Sent message: (123, Message_00123)

Sent message: (124, Message_00124)

Sent message: (125, Message_00125)

Sent message: (126, Message_00126)

Sent message: (127, Message_00127)

......


可见,由于采用同步发送,Broker 宕机并没有造成抛出异常,另外,由于使用单分区,顺序性也得到了保证,全局没有出现乱序的情况。


综上,是否使用多分区更多的是对顺序性的要求,而使用 Producer 同步发送还是异步发送,更多是出于重复消息的考虑,如果异步发送抛出异常,在保证不丢消息的前提下,势必要重发消息,这就会导致收到重复消息。多分区和 Producer 异步发送,会带来性能的提升,但是也会引入非顺序性,重复消息等问题,如何取舍要看应用的需求。


Kafka 最佳实践


Kafka 在一些应用场景中,有一些前人总结的最佳实践 8 9。对最佳实践,我的看法是,对于自己比较熟悉,有把握的部分,可以按自己的步骤进行;对一些自己不清楚的领域,可以借鉴其中的一些内容,至少不会错的特别厉害。有文章10说,Kafka 在分区比较多的时候,相应时间会变长,这个现象值得在实践中注意。


后记


在 Kafka 与 RocketMQ 的对比中,RocketMQ 的一个核心功能就是可以支持同步刷盘,此时,即使突然断电,也可以保证消息不丢;而 Kafka 采用的是异步刷盘,即使返回写入成功,也只是写入缓冲区成功,并非已经持久化。因此,如果出现断电或 kill -9 的情况,Kafka 内存中的消息可能丢失。另外,同步刷盘的效率是比较低下的,一般生产中估计也不会使用,可以用优雅关闭的方式来关闭进程。如果不考虑这些极端情况的话,Kafka 基本是一个很可靠的消息中间件。


参考文章


  • http://kafka.apache.org/documentation.html

  • http://www.jianshu.com/p/453c6e7ff81c

  • http://www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章

  • http://developer.51cto.com/art/201501/464491.htm

  • https://segmentfault.com/q/1010000004292925

  • http://www.cnblogs.com/gnivor/p/5318319.html

  • http://www.cnblogs.com/davidwang456/p/4313784.html

  • http://www.jianshu.com/p/8689901720fd

  • http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/

  • http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/


系列



【关于投稿】


如果大家有原创好文投稿,请直接给公号发送留言。


① 留言格式:
【投稿】+《 文章标题》+ 文章链接

② 示例:
【投稿】《不要自称是程序员,我十多年的 IT 职场总结》:http://blog.jobbole.com/94148/

③ 最后请附上您的个人简介哈~



看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

登录查看更多
0

相关内容

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费。
【实用书】流数据处理,Streaming Data,219页pdf
专知会员服务
76+阅读 · 2020年4月24日
【图神经网络(GNN)结构化数据分析】
专知会员服务
115+阅读 · 2020年3月22日
【2020新书】Kafka实战:Kafka in Action,209页pdf
专知会员服务
67+阅读 · 2020年3月9日
TensorFlow Lite指南实战《TensorFlow Lite A primer》,附48页PPT
专知会员服务
69+阅读 · 2020年1月17日
【干货】大数据入门指南:Hadoop、Hive、Spark、 Storm等
专知会员服务
95+阅读 · 2019年12月4日
滴滴离线索引快速构建FastIndex架构实践
InfoQ
21+阅读 · 2020年3月19日
在K8S上运行Kafka合适吗?会遇到哪些陷阱?
DBAplus社群
9+阅读 · 2019年9月4日
漏洞预警丨Xstream远程代码执行漏洞
FreeBuf
4+阅读 · 2019年7月25日
浅谈 Kubernetes 在生产环境中的架构
DevOps时代
11+阅读 · 2019年5月8日
Linux挖矿病毒的清除与分析
FreeBuf
14+阅读 · 2019年4月15日
消息队列技术点梳理(思维导图版)
架构文摘
3+阅读 · 2018年4月3日
基于 Storm 的实时数据处理方案
开源中国
4+阅读 · 2018年3月15日
机器学习(26)之K-Means实战与调优详解
机器学习算法与Python学习
4+阅读 · 2017年11月19日
Accelerated Methods for Deep Reinforcement Learning
Arxiv
6+阅读 · 2019年1月10日
Arxiv
8+阅读 · 2018年1月30日
Arxiv
3+阅读 · 2015年5月16日
VIP会员
相关VIP内容
相关资讯
滴滴离线索引快速构建FastIndex架构实践
InfoQ
21+阅读 · 2020年3月19日
在K8S上运行Kafka合适吗?会遇到哪些陷阱?
DBAplus社群
9+阅读 · 2019年9月4日
漏洞预警丨Xstream远程代码执行漏洞
FreeBuf
4+阅读 · 2019年7月25日
浅谈 Kubernetes 在生产环境中的架构
DevOps时代
11+阅读 · 2019年5月8日
Linux挖矿病毒的清除与分析
FreeBuf
14+阅读 · 2019年4月15日
消息队列技术点梳理(思维导图版)
架构文摘
3+阅读 · 2018年4月3日
基于 Storm 的实时数据处理方案
开源中国
4+阅读 · 2018年3月15日
机器学习(26)之K-Means实战与调优详解
机器学习算法与Python学习
4+阅读 · 2017年11月19日
Top
微信扫码咨询专知VIP会员