总之,producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。
存储策略
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
分发策略:
1) partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
2) 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
3) 如果既没指定partition,又没有设置key,则会轮询选出一个partition 。
1)Broker NIO异步消息处理:实现了IO线程与业务线程分离; 即生产者客户端缓存消息批量发送,消费者批量从broker获取消息,减少网络io次数,充分利用磁盘顺序读写的性能,并且还会将每个消息进行压缩传输。
2)磁盘顺序写:因为卡夫卡是将消息记录持久化到磁盘的,而磁盘的顺序读写性能是很高的;
3)零拷贝:采用了sendfile方法,从而允许操作系统将数据从Page Cache直接发送到网络socket缓冲区,只需要最后copy就可将数据复制到NIC缓冲区。
4)Page Cache:卡夫卡充分利用了操作系统的page cache机制,即利用操作系统自身的内存,而不是JVM空间内存,所以卡夫卡的读写操作基本上是基于内存的;
5)分区分段+索引:卡夫卡是采用分布式系统分区分桶的设计思想做的,所以 Kafka的message消息实际上是分布式存储在一个一个小的segment中的, Kafka又默认为分段后的数据文件建立了索引文件
(1)Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取。
(2)消费者模型:消费的消费模型有两种:推送模型(push)和拉取(pull)
1) 基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式无法很好地保证消息被处理。 消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要消息代理中记录所有的消费状态,这种做法显然是不可取的。
2) 拉取模型 (kafka): 由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息 。 这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
(3)消费高级API:
1)高级API的优点:①写起来简单;不需要自行去管理offset,系统通过zookeeper自行管理 ; 不需要管理分区,副本等情况,系统自动管理 ;
② 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset) ;
③ 可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响) 。
2)高级API的缺点:① 不能自行控制offset(对于某些特殊需求来说);② 不能细化控制如分区、副本、zk等 。
(4)消费低级API
1)低级API的优点:
① 能够让开发者自己控制offset,想从哪里读取就从哪里读取 ;
② 自行控制连接分区,对分区自定义进行负载均衡 ;
③对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中) 。
2)低级API的缺点:
① 太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。
(1) **0:**代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
(2)**1:**代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
(3)**all:**代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
auto.offset.reset值含义解释
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
自动提交offset: 在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。
手动提交offset: 生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。
手动提交方式一:同步提交 consumer.commitSync()方式提交
手动提交方式二:异步提交
consumer.commitAsync(callback)方式提交
手动提交partition的offset:
原始数据:
0.0.0.0 0 dat 2019-03-11 22:00:00 2019-03-11 22:00:00 11.184.32.234 1 4 1940 L:0,P:2 52.229.174.233 443 223.146.131.254 5636 F<-C unknown type: 259 2 L:1,P:2 52.229.174.233 443 223.146.131.254 5636 F->C unknown type: 259 2 mac: 56:00:03:0d:c0:02<-00:04:19:00:02:98 unknown type: 259 2 6 1345 10 7136 2019-03-11 22:00:00 2019-03-11 22:00:00 sni 38 .array709-prod.do.dsp.mp.microsoft.com
每条消息会在生产后加上id进行标注,以防每条数据都长一个样子
数据量 | 发送用时(ms) |
---|---|
10000 | 1 300 |
100000 | 16 000 |
1000000 | 150 000 |
数据量 | 发送用时(ms) |
---|---|
1000 | 386 |
10000 | 1 364 |
100000 | 13 160 |
1000000 | 35 139 |
数据量 | 发送用时 |
---|---|
1000 | 186ms |
10000 | 1 079ms |
100000 | 11 018ms |
1000000 | 34 670sm |
10000000 | 305 043ms |
每个测试都经过超过30次的测试,
我们可以看到效率提高了接近5倍的生产效率
目前的问题:
1万数据:总时间:2411ms
10万数据:总时间:19924ms
100万数据:总时间:180000ms
producer端增加线程数量并不能提高发送速率,单线程producer不存在线程切换生产速率更快
数据量 | 发送用时 |
---|---|
10000 | 730ms |
100000 | 794ms |
1000000 | 3 592ms |
10000000 | 33 260ms |
速率再次提高了10倍左右
基于partitions为50的topic进行测试
消费可以达到千兆网卡满带宽:111M/s 持续消费无问题
总数据量:8580万 总时间:228674 ms 平均每秒处理数据:375000
总数据量:3560万 总时间:35390 ms 平均每秒处理数据:1005000
在集群中运行:
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config compression.type=lz4
这里的 --alter 命令可以对已经创建的topic进行增加配置修改配置等
在producer中增加:
compression.type=lz4
broker.id=107
listeners=PLAINTEXT://172.16.2.107:9092
num.network.threads=25
num.io.threads=24
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/hadoop/soft/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=slave106:2181,slave107:2181,slave108:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable = false
delete.topic.enable=true
#指定接受消息的代理
bootstrap.servers=172.16.2.107:9092
#当至少有一个follow拷贝到leader中的消息后再进行提交消息已发送
acks=0
#重试次数
retries=3
#控制批量发送消息的大小,减少访问服务端的次数提高性能,16KB
batch.size=16384
#batch.size=32768
#控制消息每隔多少时间进行发送,减少访问服务端次数
linger.ms=500
#设置缓冲区大小,发送速度大于这个值的话会导致缓冲被耗尽,32MB
#buffer.memory=134217728
buffer.memory=268435456
# 设置消息压缩
compression.type=lz4
#设置键,值以什么序列的形式进行发送
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#只需要指定kafka集群中的一个ip就可以了
bootstrap.servers=172.16.2.107:9092
#每个group id都会收到一份kafka 内部的消息
group.id=test
#自动提交 offset
enable.auto.commit=false
#发送的每条消息的key,和value使用的序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer= org.apache.kafka.common.serialization.StringDeserializer
Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2018 群英 版权所有 茂名市群英网络有限公司
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号-36 粤公网安备 44090202000006号 粤工商备P091701000595