最新消息: 关于Git&GitHub 版本控制你了解多少?
您现在的位置是:群英 > 服务器 > 云计算 >
详细讲解kafka原理和性能分析测试
网络发表于 2020-09-04 15:59 次浏览

1、Kafka写数据流程:

  1. producer先从zookeeper的broker-list的节点找到partition(分区)的leader;
  2. producer将消息发送给该leader的partition;
  3. leader将消息写入本地log;
  4. followers从leader pull消息,实现replication的副本备份机制,同样写入本地log;
  5. replication写入本地log后向leader发送ack(确认);
  6. leader收到所有ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset),并向producer发送ACK。
  7. producer收到leader的ack,证明生产的数据已被kafka成功写入

总之,producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。

  1. 存储策略
    无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
    1)基于时间:log.retention.hours=168
    2)基于大小:log.retention.bytes=1073741824
    需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

  2. 分发策略:

1) partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
2) 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
3) 如果既没指定partition,又没有设置key,则会轮询选出一个partition 。

  1. kafka高性能的原因:

1)Broker NIO异步消息处理:实现了IO线程与业务线程分离; 即生产者客户端缓存消息批量发送,消费者批量从broker获取消息,减少网络io次数,充分利用磁盘顺序读写的性能,并且还会将每个消息进行压缩传输。

2)磁盘顺序写:因为卡夫卡是将消息记录持久化到磁盘的,而磁盘的顺序读写性能是很高的;

3)零拷贝:采用了sendfile方法,从而允许操作系统将数据从Page Cache直接发送到网络socket缓冲区,只需要最后copy就可将数据复制到NIC缓冲区。

4)Page Cache:卡夫卡充分利用了操作系统的page cache机制,即利用操作系统自身的内存,而不是JVM空间内存,所以卡夫卡的读写操作基本上是基于内存的;

5)分区分段+索引:卡夫卡是采用分布式系统分区分桶的设计思想做的,所以 Kafka的message消息实际上是分布式存储在一个一个小的segment中的, Kafka又默认为分段后的数据文件建立了索引文件
 

2、kafka消费过程分析

(1)Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取。

  • 多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id。
  • 同一个消费组的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!
  • **消费者数少于分区:**会出现某个消费者消费多个partition数据的情况(此时消费的速度不及只处理一个partition的消费者的处理速度)
  • **消费者数多于分区:**多出来的消费者不消费任何partition的数据。
  • 建议消费者组的consumer的数量与partition的数量一致!

(2)消费者模型:消费的消费模型有两种:推送模型(push)和拉取(pull)

​ 1) 基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式无法很好地保证消息被处理。 消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要消息代理中记录所有的消费状态,这种做法显然是不可取的。

​ 2) 拉取模型 (kafka): 由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息 。 这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。

(3)消费高级API:

​ 1)高级API的优点:①写起来简单;不需要自行去管理offset,系统通过zookeeper自行管理 ; 不需要管理分区,副本等情况,系统自动管理 ;

​ ② 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset) ;

​ ③ 可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响) 。

​ 2)高级API的缺点:① 不能自行控制offset(对于某些特殊需求来说);② 不能细化控制如分区、副本、zk等 。

(4)消费低级API

​ 1)低级API的优点:

​ ① 能够让开发者自己控制offset,想从哪里读取就从哪里读取 ;

​ ② 自行控制连接分区,对分区自定义进行负载均衡 ;

​ ③对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中) 。

​ 2)低级API的缺点:

​ ① 太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。
 

3、kafka的ACK应答机制:

  1. 在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all(-1)。(保证消息不丢失)

(1) **0:**代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
(2)**1:**代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
(3)**all:**代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

 

4、设定消费位置

auto.offset.reset值含义解释
earliest 
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
latest 
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 
none 
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

5、kafka几种消费方式

  1. 自动提交offset: 在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。

  2. 手动提交offset: 生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。

    手动提交方式一:同步提交 consumer.commitSync()方式提交

    手动提交方式二:异步提交

    consumer.commitAsync(callback)方式提交

  3. 手动提交partition的offset:

6、kafka的性能分析

producer

单线程:

原始数据:

0.0.0.0 0 dat	2019-03-11 22:00:00	2019-03-11 22:00:00	11.184.32.234	1	4	1940  L:0,P:2	52.229.174.233	443	223.146.131.254	5636	F<-C  unknown type: 259 2 L:1,P:2	52.229.174.233	443	223.146.131.254	5636	F->C  unknown type: 259 2 mac: 56:00:03:0d:c0:02<-00:04:19:00:02:98 unknown type: 259 2 6	1345	10	7136	2019-03-11 22:00:00 2019-03-11 22:00:00  sni 38 .array709-prod.do.dsp.mp.microsoft.com

每条消息会在生产后加上id进行标注,以防每条数据都长一个样子

改进前:

数据量 发送用时(ms)
10000 1 300
100000 16 000
1000000 150 000

改进后:

数据量 发送用时(ms)
1000 386
10000 1 364
100000 13 160
1000000 35 139
数据量 发送用时
1000 186ms
10000 1 079ms
100000 11 018ms
1000000 34 670sm
10000000 305 043ms

每个测试都经过超过30次的测试,

我们可以看到效率提高了接近5倍的生产效率

目前的问题:

  1. 发送百万数据的时候前面4-5秒速度很快50-60m/s,后面会降到1m/s
  2. 发送千万数据前面4-5秒速度很快50-60m/s,后面会降到3-4m/s

10个线程:

1万数据:总时间:2411ms

10万数据:总时间:19924ms

100万数据:总时间:180000ms

结论

producer端增加线程数量并不能提高发送速率,单线程producer不存在线程切换生产速率更快

开启压缩后

数据量 发送用时
10000 730ms
100000 794ms
1000000 3 592ms
10000000 33 260ms

速率再次提高了10倍左右

consumer

基于partitions为50的topic进行测试

消费可以达到千兆网卡满带宽:111M/s 持续消费无问题

总数据量:8580万 总时间:228674 ms 平均每秒处理数据:375000

总数据量:3560万 总时间:35390 ms 平均每秒处理数据:1005000

小结

  1. 生产者端不管是使用单例的producer多线程生产还是使用多个producer同时时生产数据均不会提高生产性能
  2. 最初到现在生产性能提高42.8倍,应该还有比较大的提升空间,
  3. consumer的速率提高主要的方法是增加partition数量
  4. consumer端不存在开启压缩的选项也就是topic是否开启压缩不会影响到consuemr 的消费速率。
  5. consumer的消费速率目前可以达到千兆网卡满带宽。稳定在112M/S的数据传输

topic开启压缩的方法

在集群中运行:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config compression.type=lz4

这里的 --alter 命令可以对已经创建的topic进行增加配置修改配置等

在producer中增加:

compression.type=lz4

Broker配置

broker.id=107
listeners=PLAINTEXT://172.16.2.107:9092
num.network.threads=25
num.io.threads=24
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/hadoop/soft/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=slave106:2181,slave107:2181,slave108:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable = false
delete.topic.enable=true

Producer配置

#指定接受消息的代理
bootstrap.servers=172.16.2.107:9092
#当至少有一个follow拷贝到leader中的消息后再进行提交消息已发送
acks=0
#重试次数
retries=3
#控制批量发送消息的大小,减少访问服务端的次数提高性能,16KB
batch.size=16384
#batch.size=32768
#控制消息每隔多少时间进行发送,减少访问服务端次数
linger.ms=500
#设置缓冲区大小,发送速度大于这个值的话会导致缓冲被耗尽,32MB
#buffer.memory=134217728
buffer.memory=268435456
# 设置消息压缩
compression.type=lz4
#设置键,值以什么序列的形式进行发送
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

Consumer配置

#只需要指定kafka集群中的一个ip就可以了
bootstrap.servers=172.16.2.107:9092
#每个group id都会收到一份kafka 内部的消息
group.id=test
#自动提交 offset
enable.auto.commit=false
#发送的每条消息的key,和value使用的序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer= org.apache.kafka.common.serialization.StringDeserializer
 
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
相关信息推荐