kafka_2.12-2.4.x-生产者发送消息的分区策略(源码分析) 作者:马育民 • 2021-12-02 20:19 • 阅读:10164 # 说明 当 topic 有多个分区时,一个生产者将消息发到某个分区的策略,就叫分区策略,如下: 1. 指定分区号,数据会直接发送到所指定的分区 2. 没有指定分区号,指定了数据的key,可以通过key获取hashCode决定数据发送到哪个分区 3. 既没有给定分区号,也没有给定key值,会采取round-robin fashion,是kafka的轮询策略 4. 自定义分区策略 # 源码分析 调用流程如下图: [![](/upload/0/0/1IX4WFOn0DMa.jpg)](/upload/0/0/1IX4WFOn0DMa.jpg) ### 第一步 调用 `KafkaProducer` 类的下面方法,返回分区号: ``` private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); //当有 分区号 时,就直接返回分区号 return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } ``` 没有 分区号 时,就执行 `this.partitioner.partition()` 方法,见第二步 ### 第二步 调用 `DefaultPartitioner` 类的下面方法: ``` public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //取出该主题的分区列表 List partitions = cluster.partitionsForTopic(topic); //得到分区数量 int numPartitions = partitions.size(); //如果key为空,轮询的方式得到分区号 if (keyBytes == null) { //获取下一个数字,目的是轮询发送给各个分区 int nextValue = this.nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic); //如果可用的 分区数量大于0 if (availablePartitions.size() > 0) { //对可用分区数量取模 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { //对总共分区数量取模 return Utils.toPositive(nextValue) % numPartitions; } } else { //如果key不为空,就根据 keyBytes 算出 `hashCode`,在对分区数取模 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } ``` ### nextValue() 在多线程中,以原子的方式 `+1` ``` private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic); //如果取出的数字为null,就创建一个随机数,并放入 if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } //不为空就+1,并返回 return counter.getAndIncrement(); } ``` 原文出处:http://malaoshi.top/show_1IX2KpgfvbBj.html