消息中间件的作用有,业务解耦,高并发场景削弱流量异步处理,削峰填谷
高可用:备份机制
伸缩性:分区机制
使用场景
日志收集
一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
消息系统
解耦和生产者和消费者、缓存消息等。
用户活动跟踪
Kafka经常被用来记录web用户或者app用户的各种活动
,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,
然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
运营指标
Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理
比如spark streaming和storm
Quick Start
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-stop.sh
docker-compose启动
vi docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
- "9977:9977"
environment:
JMX_PORT: 9977
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-manager:
image: sheepkiller/kafka-manager
ports:
- "9000:9000"
links:
- zookeeper
- kafka
environment:
ZK_HOSTS: zookeeper:2181
APPLICATION_SECRET: letmein
docker-compose up -d 后台启动服务
docker-compose down 卸载服务(容器也会被删除)
整体流程
关键词概念
Broker
kafka集群是由多个broker组成,一台kafka服务器就是可以简单理解就是一个broker.
如果服务器是在阿里云,记得要开放端口
集群配置要换成内网ip
listeners=PLAINTEXT://172.18.34.235:9094
外网ip
advertised.listeners=PLAINTEXT://120.78.62.137:9094
Topic
某一个消息队列,topic的名称是不能重复的,topic的信息存储在zookeeper
Producer
消息生产者,就是向Kafkabroker发消息的客户端。
发送消息可以指定pation
Consumer
消息消费者,向Kafkabroker取消息的客户端。
同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息
Partition
kafka 会将第i个Partion 分配到第(i%n)个broker上
partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,
来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;
可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.
此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力
为了实现扩展性,一个非常大的topic可以分布到多个broker(服务器)上,一个topic由多个partition组成,
每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
Producer通过Partitioner决定消息发送到哪个Partition
kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。
Replica
Kafka 定义了两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。前者对外提供服务
当然了,你可能知道在很多其他系统中追随者副本是可以对外提供服务的,比如 MySQL 的从库是可以处理读操作的,
但是在 Kafka 中追随者副本不会对外提供服务
基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";
leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);
follower只是单调的和leader跟进,同步消息即可
。由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",
kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定。
分区和备份
假设有3个broker集群b1,b2,b3
创建topic时候假设:
指定pation为1和replicationFactor为3,
那么消息将在b1记录为/tmp/log/b1/xx-topic-0,b2和b3都是备份b1的消息/tmp/log/b2/xx-topic-0,/tmp/log/b3/xx-topic-0
指定pation为3和replicationFactor为1,
那么消息将在b1记录为/tmp/log/b1/xx-topic-0,b2为 /tmp/log/b2/xx-topic-1,b3为/tmp/log/b3/xx-topic-2
b1的备份可能在b2或者b3,同理b2,b3备份
也就是说创建topic的时候,为了消息的横向扩展和可备份下,设置partion的值最好和broker一样。
如果为了消息的有序性,可以只设置一个pariton
总结
Kafka 的三层消息架构:
第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;
其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
最后,客户端程序只能与分区的领导者副本进行交互
Kafka Broker 持久化数据
通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,
消息被追加写到当前最新的日志段中,当写满了一个日志段后,
Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。
Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的
Kafka重平衡
假设组内某个实例挂掉了,Kafka 能够自动检测到,然后把这个 Failed 实例之前负责的分区转移给其他活着的消费者
分区位移与消费者位移(Consumer Offset)
生产者向分区写入消息,每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。
分区位移总是从 0 开始,假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、2、…、9
一旦消息写入后分区位移是不变的
消费者位移是可以随时变化的,每个消费者都可以有自己的消费位移,而且消费者消费的时候还可以指定从头开始消费
重复消费就是指消费者消费完后,partion的位移数据没有提交上去,导致下次消费还是从消费前的位移开始
常见配置
生产者配置
可选参数:
acks:指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
acks=1:
只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。
如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。
不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。
acks=all:
只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。
buffer.memory:设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
max.block.ms:指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。
batch.size:当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。
retries:指定生产者可以重发消息的次数。
receive.buffer.bytes和send.buffer.bytes:指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
linger.ms:指定了生产者在发送批次前等待更多消息加入批次的时间。
消费者配置
group.id 默认值:无
唯一的指明了consumer的group的名字,group名一样的进程属于同一个consumer group。
consumer.id 默认值:null
如果没有设置的话则自动生成。
socket.timeout.ms 默认值:30 * 1000
socket请求的超时时间。实际的超时时间为max.fetch.wait + socket.timeout.ms。
socket.receive.buffer.bytes 默认值:64 * 1024
socket的receiver buffer的字节大小。
fetch.message.max.bytes 默认值:1024 * 1024
每一个获取某个topic的某个partition的请求,得到最大的字节数,每一个partition的要被读取的数据会加载入内存,所以这可以帮助控制consumer使用的内存。这个值的设置不能小于在server端设置的最大消息的字节数,否则producer可能会发送大于consumer可以获取的字节数限制的消息。
auto.commit.enable 默认值:true
如果设为true,consumer会定时向ZooKeeper发送已经获取到的消息的offset。当consumer进程挂掉时,已经提交的offset可以继续使用,让新的consumer继续工作。
auto.commit.interval.ms 默认值:60 * 1000
consumer向ZooKeeper发送offset的时间间隔。
queued.max.message.chunks 默认值:10
缓存用来消费的消息的chunk的最大数量,每一个chunk最大可以达到fetch.message.max.bytes。
rebalance.max.retries 默认值:4
当一个新的consumer加入一个consumer group时,会有一个rebalance的操作,导致每一个consumer和partition的关系重新分配。如果这个重分配失败的话,会进行重试,此配置就代表最大的重试次数。
fetch.min.bytes 默认值:1
一个fetch请求最少要返回多少字节的数据,如果数据量比这个配置少,则会等待,知道有足够的数据为止。
fetch.wait.max.ms 默认值:100
在server回应fetch请求前,如果消息不足,就是说小于fetch.min.bytes时,server最多阻塞的时间。如果超时,消息将立即发送给consumer.。
rebalance.backoff.ms 默认值:2000
在rebalance重试时的backoff时间。
refresh.leader.backoff.ms 默认值:200
在consumer发现失去某个partition的leader后,在leader选出来前的等待的backoff时间。
auto.offset.reset 默认值:largest
在Consumer在ZooKeeper中发现没有初始的offset时或者发现offset不在范围呢,该怎么做:
* smallest : 自动把offset设为最小的offset。
* largest : 自动把offset设为最大的offset。
* anything else: 抛出异常。
consumer.timeout.ms 默认值:-1
如果在指定的时间间隔后,没有发现可用的消息可消费,则抛出一个timeout异常。
client.id 默认值: group id value
每一个请求中用户自定义的client id,可帮助追踪调用情况。
zookeeper.session.timeout.ms 默认值:6000
ZooKeeper的session的超时时间,如果在这段时间内没有收到ZK的心跳,则会被认为该Kafka server挂掉了。如果把这个值设置得过低可能被误认为挂掉,如果设置得过高,如果真的挂了,则需要很长时间才能被server得知。
zookeeper.connection.timeout.ms 默认值:6000
client连接到ZK server的超时时间。
zookeeper.sync.time.ms 默认值:2000
一个ZK follower能落后leader多久。
常见问题
什么 Kafka 不像 MySQL 那样允许追随者副本对外提供读服务?
1,kafka的分区已经让读是从多个broker读从而负载均衡,不是MySQL的主从,压力都在主上;
2,kafka保存的数据和数据库的性质有实质的区别就是数据具有消费的概念,是流数据,kafka是消息队列,
所以消费需要位移,而数据库是实体数据不存在这个概念,如果从kafka的follower读,消费端offset控制更复杂;
3,生产者来说,kafka可以通过配置来控制是否等待follower对消息确认的,如果从上面读,
也需要所有的follower都确认了才可以回复生产者,造成性能下降,如果follower出问题了也不好处理
为什么副本不提供写服务
如果副本要提供写服务的话,那么数据就要在所有的副本之间相互同步,n个副本就需要n * n同步数据,
如果采用异步同步数据,数据的一致性和有序性很难保证
为什么副本不提供读服务
1、同步延迟带来的数据不一致
2、kafka是个有序的消息消费,消费进度依赖offset的偏移量,这个偏移量是要保存起来的,如果多个副本进行
读负载均衡,偏移量就很难确定,
3、kafka是个消息队列,副本的作用就是保证消息不丢失,负载均衡已经用分区来保证了。
partition主从副本数据同步
1、生产者发布消息时候 首先通过zk找到该分区的leader服务,然后将消息发给leader,ledaer收到消息后写入到磁盘
2、每个follower从leader上pull消息,写入磁盘后再发送ack
3、一旦leader收到了isr中所有副本到ack,消息就会被commit,leader会向生产者发送ack
消费者读取消息到时候只会从leader里面读取,只有被commit过到消息才会暴露给消费者
kafka 消费者加入consumer-group流程
https://blog.csdn.net/asdfsadfasdfsa/article/details/104883173/
Coordinator组件,其中包含了消费者端的 ConsumerCoordinator 和 Broker 端的 GroupCoordinator
在Broker端GroupCoordinator 负责的是:消费者 group 成员管理以及 offset 提交
首先我们在调用 KafkaConsumer.poll() 时,首先会去调用 ConsumerCoordinator.poll() ,然后也会去调用位移提交的相关操作
1、判断是否需要加入group,如果订阅主题分区发生变化,或者新消费者入组等,需要重新入组。
此时是通过ensureActiveGroup() 发送JoinGroup、SyncGroup,并获取到分配给自身的TopicPartition
2、检测心跳线程是否正常,心跳线程需要定时向GroupCoordinator发送心跳,超过约定阈值就会认为Consumer离组,触发Rebalance
3、如果设置的是自动提交位移,达到时间阈值就会提交offset。
状态流转过程:
最开始消费者group是Empty 状态,当Rebalance 开启后,
会被置于 RreparingRebalance 状态等待成员加入group。
之后当有成员入组时,会变更到CompletingRebalance 状态等待分配方案。分配完成后会流转到Stable 状态完成充平衡。
当有新成员入组或者成员退出时,消费者group 状态从 Stable 直接变为 PreparingRebalance 状态,
此时所有成员都需要重新加入group。当所有的成员都退出组时,状态会变为 Empty。
Kafka 定期自动删除过期位移的条件就是,group要处于 Empty 状态。当消费者 group 停用了很长时间(超过7天),此时Kafka 就可能将其删除
消费者Rebalance的几种场景
1、组成员数量发生变化
2、订阅主题数量发生变化
3、订阅主题的分区数发生变化
Kafka重复消费原因
底层根本原因:已经消费了数据,但是offset没提交。
程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible
默认5秒自动提交,如果这5秒内容程序挂了 就会导致不能提交。
原因1:强行kill线程,导致消费后的数据,offset没有提交。
原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe()
则有可能部分offset没提交,下次重启会重复消费。
原因3(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。
比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),
那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题
那些情景下会造成消息漏消费
消费者没有处理完消息 提交offset(自动提交偏移 未处理情况下程序异常结束)
如何保证消息不丢失
(1) 导致消息丢失的原因?
kafka没有保存消息.
消费者还没消费就提交了offset, 然后消费者重启或宕机, 分区重平衡.
(2) 解决办法
配置partition副本机制.
•default.replication.factor 每个分区的副本数必须大于1.
•min.insync.replicas 与主副本保存同步状态的从副本数必须大于等于1.
•Producer端的配置acks=all, 指数据写入min.insync.replicas个从副本后才算写入成功.
•Producer端的配置retries=MAX(一个很大的值, 表示无线重试的意思), 指数据一旦写入失败, 就无限重试.
关闭自动提交offset, 改为手动提交.
先消费, 消费成功后再手动提交offset.
如何解决Kafka重复消费
1、给每个消息都设置一个独一无二的 key,消费的时候把 key 记录下来,然后每次消费新的消息的时候都查询一下,看当前消息的这个 key 是否消费过,如果没有消费过才进行消费。
2、业务保证幂等
Kafka线上集群部署方案
系统:linux、
磁盘:磁盘用普通机械磁盘(顺序读写操作),没必要用ssd(随机读写性能好,不用使用使用磁盘阵列(RAID),
因为kafka本身提供了供高可靠性与负载均衡功能
磁盘容量:按照消息量预估
每天1亿条1KB大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于1亿 * 1KB * 2 / 1000 / 1000 = 200GB
每天就需要200GB,还要预留10%的磁盘空间,因此总的存储容量就是 220GB。
保存两周,那么整体容量即为 220GB * 14,大约 3TB 左右。
Kafka 支持数据的压缩,假设压缩比是 0.75,
那么最后你需要规划的存储空间就是 0.75 * 3 = 2.25TB
新增消息数、消息留存时间、平均消息大小、备份数、是否启用压缩
带宽: 1Gbps 的千兆网络和 10Gbps 的万兆网络
现在你有个业务,其业务目标或 SLA 是在 1 小时内处理 1TB 的业务数据。
那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢
根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比较合理的值,
也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源
稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源,
故通常要再额外预留出 2/3 的资源,即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps
好了,有了 240Mbps,我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。
根据这个目标,我们每秒需要处理 2336Mb 的数据,除以 240,约等于 10 台服务器。
如果消息还需要额外复制两份,那么总的服务器台数还要乘以 3,即 30 台
consumer的offset应该如何保存呢
https://www.dazhuanlan.com/2019/08/30/5d68208d8f4a9/
https://blog.csdn.net/qq_41049126/article/details/111311816
以前是放在Kafka里面,但是Kafka用作持久化是没问题的,但是用作频繁写并不是太适合,
所以,Kafka后来把对offset的保存放在 __consumer_offsets topic里面。
Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题
如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3
通过 Math.abs(groupID.hashCode()) % numPartitions 可以计算group在这个topic里面那个partion里面
存储格式:
[g1,ggj,3]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1612168618877, expireTimestamp=None)
例如groupid=g1
"g1".hashCode()%50=42
那么日志就在__consumer_offsets-42目录里面
auto.offset.reset值含义解释
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费()
新创建的group会从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
新创建的group会从该group创建后的消息开始消费
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
只要kafka中相同group、partition中已经有提交的offset,则都无法从开始消费
Kafka的Log Retention的理解
kafka留存策略包括 删除和压缩两种
删除: 根据时间和大小两个方式进行删除 大小是整个partition日志文件的大小
超过的会从老到新依次删除 时间指日志文件中的最大时间戳而非文件的最后修改时间
压缩: 相同key的value只保存一个 压缩过的是clean 未压缩的dirty 压缩之后的偏移量不连续 未压缩时连续
kafka通信模式,是pull还是push
producer是推消息给broker的 因为你不知道啥时候有消息,如果用拉肯定不合理
如果broker推消息给comsumer
优点:实时性高
缺点:
1、消费者能力有限,如果一下子推N多条数据,消费者扛不住
2、broker需要知道消息是被那台机器去消费,而且还要管理每个partion以及消费的进度,还需要与zk通信,增加了broker的工作量
消费者主动的去broker拉消息,消费者可根据自己的消费能力去拉消息
优点:broker只需要存储好生产者发来的数据,具体谁来拉 什么时候拉都不用管了。可以支持批量拉取,消费者根据自身条件选择批量拉取
缺点:消息延迟,消费者可能两秒发一次请求向broker拉数据(不能过于频繁)。 消息忙请求(可能连续拉了几个小时一条消息都没有,做无用功)
kafka同一个groupid下多个consumer订阅同一个topic,只有一个consumer能消费到数据?
topic的某一个partion 只会被同一个groupid的某一个consumer消费到,除非该consumer实例下线,才会触发重平衡
排查:
1、看下这个topic是否只有一个partion
2、看下这个topic发送者是不是只发送到某个partion
rocketmq与kafka区别
1、数据可靠性
RocketMQ支持异步实时刷盘,同步刷盘,同步复制,异步复制
kafka使用异步刷盘方式,异步复制/同步复制
2、性能对比
kafka单机写入TPS约在百万条/秒,消息大小10个字节
RocketMQ单机写入TPS单实例约7万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节
如果让你自己写一个消息队列, 该如何进行架构设计
(1) 分布式
这个消息队列必须分布式的, 这样通过水平扩展集群就可以增加消息队列的吞吐量与容量. 分布式的消息队列必须要有一个master节点来管理整个集群,
可以通过Zookeeper来实现master节点选举算法.
(2) 可用性
一个topic必须支持多个partition, 且partition数量可以增加, 每个partition分布在不同的节点上. partition内通过offset来保证消息的顺序. 同时为了保证可用性, 每个分区必须设置副本, 主副本提供读写服务, 从副本只作备份即可. 当主副本所在的节点宕机后, master节点会在从副本中选出一个作为主副本,
然后当宕机的节点修复后, master节点会将缺失的副本分配过去, 同步数据后, 集群恢复正常.
(3) 高性能
为了保证高吞吐量, 我们可以使用批量压缩, 顺序写, 零拷贝技术.
(4) 解决消息丢失方案
消息必须写入所有副本中才算写入成功.
Kafka为什么速度那么快?
https://mp.weixin.qq.com/s/qYN3cZGZmiiUVY6EryzPOQ
Kafka的核心特性之一就是高吞吐率, 但Kafka的数据是存储在磁盘上的, 一般认为在磁盘上读写数据性能很低, 那Kafka是如何做到高吞吐率的呢?
Kafka高吞吐率的秘诀在于, 它把所有的消息都进行批量压缩, 提升网络IO, 通过顺序写和零拷贝技术提升磁盘IO
高效使用磁盘
1、kafka的整个设计中,partition相当于一个非常长的数组,broker接受到到消息顺序的写到这个大数组里面,同时消费者通过offset顺序消费这些数据,并且不会
删除已经消费的数据,避免了随机读写磁盘的操作
2、Kafka顺序存写数据,故删除时删除对应的Segment(物理文件,disk),避免对文件的随机写操作
3、利用了页缓存pagecache
4、支持多disk drive
如果机器上有多个Disk Drive,可将不同的Disk挂载到不同的目录,然后将这些目录都配置到log.dirs里。
Kafka会尽可能将不同的Partition分配到不同的目录,也即不同的Disk上,从而充分利用了多Disk的优势
2、批处理
处理是一种常用的用于提高I/O性能的方式。对Kafka而言,批处理既减少了网络传输的Overhead,又提高了写磁盘的效率。
3、零拷贝
传统模式下的四次拷贝与四次上下文切换
例如:
buffer = File.read
Socket.send(buffer)
上面这个过程实际发生了四次数据拷贝,
1、首先通过系统调用将文件数据读入到内核状态buffer(dma拷贝),
2、然后应用程序再将内存状态buffer数据读入到用户状态buffer,(cpu拷贝)
3、用户程序发送socket的时候 将用户状态buffer数据拷贝到内核状态(cpu拷贝)
4、最后通过dma拷贝将数据拷贝到nic buffer(网卡缓冲),同时还伴随着四次上下文切换
传统的读取文件数据并发送到网络的步骤如下:
(1)操作系统将数据从磁盘文件中读取到内核空间的页面缓存;
(2)应用程序将数据从内核空间读入用户空间缓冲区;
(3)应用程序将读到数据写回内核空间并放入socket缓冲区;
(4)操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。
sendfile和transferTo实现零拷贝,Linux 2.4+内核通过sendfile系统调用,提供了零拷贝
数据通过DMA拷贝到内核态Buffer后,直接通过DMA(Direct Memory Access,直接内存存取)拷贝到NIC Buffer,无需CPU拷贝
因为整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,因此大大提高了性能。
从具体实现来看,Kafka的数据传输通过TransportLayer来完成,其子类PlaintextTransportLayer
通过Java NIO的FileChannel的transferTo和transferFrom方法实现零拷贝
如果有10个消费者,传统方式下,数据复制次数为4*10=40次,
而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存,10次表示10个消费者各自读取一次页面缓存
Kafka的日志结构与数据存储
Kafka中的消息存储在物理上是以一个或多个分区(Partition)构成,每个分区对应本地磁盘上的一个文件夹,
每个文件夹内包含了日志索引文件(“.index”和“.timeindex”)和日志数据文件(“.log”)
在Kafka中,每个Log对象又可以划分为多个LogSegment文件,
每个LogSegment文件包括一个日志数据文件和两个索引文件(偏移量索引文件和消息时间戳索引文件)。
参考网站
http://trumandu.github.io/2019/04/13/Kafka%E9%9D%A2%E8%AF%95%E9%A2%98%E4%B8%8E%E7%AD%94%E6%A1%88%E5%85%A8%E5%A5%97%E6%95%B4%E7%90%86/