这篇文章主要讲解了“怎么理解kafka分区、生产和消费”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么理解kafka分区、生产和消费”吧!
kafka分区说明
分区规则指的是将每个Topic划分成多个分区(Partition),每个分区是一组有序的消息日志,生产者生产的每条消息只会被发送到其中一个分区。
分区 (Partition) 都是一个有序的、不可变的数据序列,消息数据被不断的添加到序列的尾部。分区中的每一条消息数据都被赋予了一个连续的数字ID,即偏移量 (offset) ,用于唯一标识分区中的每条消息数据。
分区(Partition)的作用就是提供负载均衡的能力,单个topic的不同分区可存储在相同或不同节点机上,为实现系统的高伸缩性(Scalability),不同的分区被放置到不同节点的机器上,各节点机独立地执行各自分区的读写任务,如果性能不足,可通过添加新的节点机器来增加整体系统的吞吐量。
kafka分区结构
Kafka分区下数据使用消息日志(Log)方式保存数据,具体方式是在磁盘上创建只能追加写(Append-only)消息的物理文件。因为只能追加写入,因此避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作。Kafka日志文件分为多个日志段(Log Segment),消息被追加写到当前最新的日志段中,当写满一个日志段后Kafka会自动切分出一个新的日志段,并将旧的日志段封存。
Kafka将消息数据根据Partition进行存储,Partition分为若干Segment,每个Segment的大小相等。Segment由index file、log file、timeindex file等组成,后缀为".index"和".log",分别表示为Segment索引文件、数据文件,每一个Segment存储着多条信息。 
kafka分区策略
官方分区策略
轮询策略
轮询策略(Round-robin),即顺序分配策略。如果一个Topic有3个分区,则第1条消息被发送到分区0,第2条被发送到分区1,第3条被发送到分区2,以此类推。当生产第4条消息时又会重新轮询将其分配到分区0。
轮询策略是Kafka Java生产者API默认提供的分区策略。如果未指定partitioner.class参数,那么生产者程序会按照轮询的方式在Topic的所有分区间均匀地存储消息。轮询策略有非常优秀的负载均衡表现,能保证消息最大限度地被平均分配到所有分区上。
随机策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
按消息键保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
基于地理位置的分区策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isChina(p.leader().host())).map(PartitionInfo::partition).findAny().get();
自定义分区策略
如果要自定义分区策略,需要显式地配置生产者端的参数partitioner.class。编写生产者程序时,可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner
接口(partition()和close()),通常只需要实现最重要的partition方法。
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)。设置partitioner.class参数为自己实现类的Full Qualified Name,生产者程序就会按照自定义分区策略的代码逻辑对消息进行分区。
kafka分区存储策略
kafka压缩
生产者压缩
Kafka 2.1.0版本前,支持GZIP、Snappy、LZ4三种压缩算法。2.1.0版本开始正式支持Zstandard算法(简写为zstd ,Facebook开源的一个压缩算法),该算法能够提供超高的压缩比(compression ratio)。压缩算法可以使用压缩比和压缩/解压缩吞吐量两个指标进行衡量。不同压缩算法的性能比较如下: 
生产环境中,GZIP、Snappy、LZ4、zstd性能表现各有千秋,在吞吐量方面:LZ4 > Snappy > zstd > GZIP;在压缩比方面,zstd > LZ4 > GZIP > Snappy。
如果要启用Producer端的压缩,Producer程序运行机器上的CPU资源必须充足。除了CPU资源充足,如果生产环境中带宽资源有限,也建议Producer端开启压缩。通常,带宽比CPU和内存要昂贵的多,因此千兆网络中Kafka集群带宽资源耗尽很容易出现。如果客户端机器CPU资源富余,建议Producer端开启zstd压缩,可以极大地节省网络资源消耗。对于解压缩,需要避免非正常的解压缩,如消息格式转换的解压缩操作、Broker与Producer解压缩算法不一致。
消费者解压缩
kafka分区消息保序
存储消息保序
如果将Topic设置成单分区,该Topic的所有的消息都只在一个分区内读写,保证全局的顺序性,但将丧失Kafka多分区带来的高吞吐量和负载均衡的性能优势。
多分区消息保序的方法是按消息键保序策略,根据业务提取出需要保序的消息的逻辑主体,并建立消息标志位ID,,对标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区,既可以保证分区内的消息顺序,也可以享受到多分区带来的搞吞吐量。
说明:消息重试只是简单将消息重新发送到原来的分区,不会重新选择分区。
消费消息保序
消息路由策略
生产者
消息生产过程
Producer先通过分区策略确定数据录入的partition,再从Zookeeper中找到Partition的Leader
Producer将消息发送给分区的Leader。
Leader将消息接入本地的Log,并通知ISR(In-sync Replicas,副本同步列表)的Followers。
ISR中的Followers从Leader中pull消息,写入本地Log后向Leader发送ACK(消息发送确认机制)。
Leader收到所有ISR中的Followers的ACK后,增加HW(high watermark,最后commit 的offset)并向Producer发送ACK,表示消息写入成功。
生产者保证发送成功
必须使用producer.send(msg, callback)接口发送消息。
Producer端设置acks参数值为all。acks参数值为all表示ISR中所有Broker副本都接收到消息,消息才算已提交。
设置Producer端retries参数值为一个较大值,表示Producer自动重试次数。当出现网络瞬时抖动时,消息发送可能会失败,此时Producer能够自动重试消息发送,避免消息丢失。
设置Broker端unclean.leader.election.enable = false,unclean.leader.election.enable参数用于控制有资格竞选分区Leader的Broker。如果一个Broker落后原Leader太多,那么成为新Leader必然会造成消息丢失。因此,要将unclean.leader.election.enable参数设置成false。
设置Broker端参数replication.factor >= 3,将消息保存多份副本。
设置Broker参数min.insync.replicas > 1,保证ISR中Broker副本的最少个数,在acks=-1时才生效。设置成大于1可以提升消息持久性,生产环境中不能使用默认值 1。
必须确保replication.factor > min.insync.replicas,如果两者相等,那么只要有一个副本挂机,整个分区无法正常工作。推荐设置成replication.factor = min.insync.replicas + 1。
确保消息消费完成再提交。设置Consumer端参数enable.auto.commit为false,并采用手动提交位移的方式。
生产者拦截器
Producer端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口有两个核心的方法:
假设第一个拦截器的完整类路径是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个拦截器是com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,Producer指定拦截器的Java代码示例如下:
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
消费者
消费过程
Consumer向Broker提交连接请求,连接的Broker会向其发送Broker Controller的通信URL,即配置文件中的listeners地址;
当Consumer指定了要消费的Topic后,会向Broker Controller发送消费请求;
Broker Controller会为Consumer分配一个或几个Partition Leader,并将Partition的当前offset发送给Consumer;
Consumer会按照Broker Controller分配的Partition对其中的消息进行消费;
当Consumer消费完消息后,Consumer会向Broker发送一个消息已经被消费反馈,即消息的offset;
在Broker接收到Consumer的offset后,会更新相应的__consumer_offset中;
消费者拦截器
重复消费问题的解决方案
同一个Consumer重复消费
不同的Consumer重复消费
感谢各位的阅读,以上就是“怎么理解kafka分区、生产和消费”的内容了,经过本文的学习后,相信大家对怎么理解kafka分区、生产和消费这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是天达云,小编将为大家推送更多相关知识点的文章,欢迎关注!