200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > kafka-生产者消息发送流程

kafka-生产者消息发送流程

时间:2023-01-06 06:33:17

相关推荐

kafka-生产者消息发送流程

消息发送

Producer创建时,会创建一个Sender线程并设置为守护线程。生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。落盘到broker成功,返回生产元数据给生产者。元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回

必要参数配置

broker配置 配置条目的使用方式

2. 配置参数

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。

序列化器的作用就是用于序列化要发送的消息的。

Kafka使用 org.mon.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数组。

package org.mon.serialization;import java.io.Closeable;import java.util.Map;/*** 将对象转换为byte数组的接口* *该接口的实现类需要提供无参构造器* @param <T> 从哪个类型转换*/public interface Serializer<T> extends Closeable {/*** 类的配置信息* @param configs key/value pairs* @param isKey key的序列化还是value的序列化*/void configure(Map<String, ?> configs, boolean isKey);/*** 将对象转换为字节数组* *@param topic 主题名称* @param data 需要转换的对象* @return 序列化的字节数组*/byte[] serialize(String topic, T data);/*** 关闭序列化器* 该方法需要提供幂等性,因为可能调用多次。*/@Overridevoid close();

系统提供了该接口的子接口以及实现类:

org.mon.serialization.ByteArraySerializer

org.mon.serialization.ByteBufferSerializer

org.mon.serialization.BytesSerializer

org.mon.serialization.DoubleSerializer

org.mon.serialization.FloatSerializer

org.mon.serialization.IntegerSerializer

org.mon.serialization.StringSerializer

org.mon.serialization.LongSerializer

自定义序列化器

数据的序列化一般生产中使用avro。

自定义序列化器需要实现org.mon.serialization.Serializer接口,并实现其中

的 serialize 方法

public class User {private Integer userId;private String username;public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}}

实现接口

package com.liu.kafka.serializer;import org.mon.errors.SerializationException;import org.mon.serialization.Serializer;import java.io.UnsupportedEncodingException;import java.nio.ByteBuffer;import java.util.Map;public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}}

生产者

public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();// 指定初始连接用到的broker地址configs.put("bootstrap.servers", "192.168.181.140:9092");// 指定key的序列化类configs.put("key.serializer", StringSerializer.class);// 指定value的序列化类configs.put("value.serializer", UserSerializer.class);// configs.put("acks", "all");// configs.put("reties", "3");KafkaProducer<String, User> producer = new KafkaProducer<String,User>(configs);User user = new User();user.setUserId(1001);user.setUsername("张三");ProducerRecord<String, User> record = new ProducerRecord<>("tp_user_01",0,user.getUsername(),user);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("消息发送成功:"+ metadata.topic() + "\t"+ metadata.partition() + "\t"+ metadata.offset());} else {System.out.println("消息发送异常");}});// 关闭生产者producer.close();}}

分区器

默认(DefaultPartitioner)分区计算:

如果record提供了分区号,则使用record提供的分区号如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。会首先在可用的分区中分配分区号如果没有可用的分区,则在该主题所有分区中分配分区号

如果要自定义分区器,则需要

首先开发Partitioner接口的实现类在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xx.Xxx.class”)

位于 org.apache.kafka.clients.producer 中的分区器接口:

package org.apache.kafka.clients.producer;import org.mon.Configurable;import org.mon.Cluster;import java.io.Closeable;/*** 分区器接口*/public interface Partitioner extends Configurable, Closeable {/*** 为指定的消息记录计算分区值* *@param topic 主题名称* @param key 根据该key的值进行分区计算,如果没有则为null。* @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为null* @param value 根据value值进行分区计算,如果没有,则为null* @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为null* @param cluster 当前集群的元数据*/public int partition(String topic, Object key, byte[] keyBytes, Objectvalue, byte[] valueBytes, Cluster cluster);/*** 关闭分区器的时候调用该方法*/public void close();}

包 org.apache.kafka.clients.producer.internals 中分区器的默认实现

package org.apache.kafka.clients.producer.internals;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.ThreadLocalRandom;import java.util.concurrent.atomic.AtomicInteger;import org.apache.kafka.clients.producer.Partitioner;import org.mon.Cluster;import org.mon.PartitionInfo;import org.mon.utils.Utils;/*** 默认的分区策略:* *如果在记录中指定了分区,则使用指定的分区* 如果没有指定分区,但是有key的值,则使用key值的散列值计算分区* 如果没有指定分区也没有key的值,则使用轮询的方式选择一个分区*/public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = newConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}/*** 为指定的消息记录计算分区值* *@param topic 主题名称* @param key 根据该key的值进行分区计算,如果没有则为null。* @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为null* @param value 根据value值进行分区计算,如果没有,则为null* @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为null* @param cluster 当前集群的元数据*/public int partition(String topic, Object key, byte[] keyBytes, Objectvalue, byte[] valueBytes, Cluster cluster) {// 获取指定主题的所有分区信息List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 分区的数量int numPartitions = partitions.size();// 如果没有提供keyif (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions =cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) %availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-availablepartitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partition// 如果有,就计算keyBytes的哈希值,然后对当前主题的个数取模return Utils.toPositive(Utils.murmur2(keyBytes)) %numPartitions;}} private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = newAtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter =topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}} return counter.getAndIncrement();} public void close() {}}

可以实现Partitioner接口自定义分区器:

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

拦截器

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用

于实现Client端的定制化控制逻辑。

对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是

org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。

onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。

close:关闭Interceptor,主要用于执行一些资源清理工作

,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意

实体类

package com.lagou.kafka.demo.entity;public class User {private Integer userId;private String username;public Integer getUserId() {return userId;} public void setUserId(Integer userId) {this.userId = userId;} public String getUsername() {return username;} public void setUsername(String username) {this.username = username;}}

自定义序列化器

public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}}

自定义分区器

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

自定义拦截器1

package com.liu.kafka.Interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.mon.header.Headers;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;public class InterceptorOne <KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER =LoggerFactory.getLogger(InterceptorOne.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器1---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptorOne".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,VALUE>(topic,partition,timestamp,key,value,headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器1---back");if (exception != null) {// 如果发生异常,记录日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

自定义拦截器2

package com.liu.kafka.Interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.mon.header.Headers;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;public class InterceptorTwo<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器2---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptorTwo".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,VALUE>(topic,partition,timestamp,key,value,headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器2---back");if (exception != null) {// 如果发生异常,记录日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

自定义拦截器3

package com.liu.kafka.Interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.mon.header.Headers;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;public class InterceptorThree<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器3---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptorThree".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,VALUE>(topic,partition,timestamp,key,value,headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器3---back");if (exception != null) {// 如果发生异常,记录日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

生产者

package com.liu.kafka.producer;import com.liu.kafka.partitioner.MyPartitioner;import com.liu.kafka.serializer.User;import com.liu.kafka.serializer.UserSerializer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.mon.serialization.StringSerializer;import java.util.HashMap;import java.util.Map;public class MyProducer1 {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.181.140:9092");// 设置自定义分区器// configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);configs.put("partitioner.class","com.liu.kafka.partitioner.MyPartitioner");// 设置拦截器configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.liu.kafka.Interceptor.InterceptorOne," +"com.liu.kafka.Interceptor.InterceptorTwo," +"com.liu.kafka.Interceptor.InterceptorThree");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);// 设置自定义的序列化类configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,UserSerializer.class);KafkaProducer<String, User> producer = new KafkaProducer<String,User>(configs);User user = new User();user.setUserId(1001);user.setUsername("张三");ProducerRecord<String, User> record = new ProducerRecord<>("tp_user_01",0,user.getUsername(),user);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("消息发送成功:"+ metadata.topic() + "\t"+ metadata.partition() + "\t"+ metadata.offset());} else {System.out.println("消息发送异常");}});// 关闭生产者producer.close();}}

原理

KafkaProducer有两个基本线程

主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;

消息收集器RecoderAccumulator为每个分区都维护了一个Deque 类型的双端队列。

ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响;

由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。

每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。

Sender线程
该线程从消息收集器获取缓存的消息,将其处理为 <Node, List 的形式, Node 表示集群的broker节点。进一步将<Node, List转化为<Node, Request>形式,此时才可以向服务端发送数据。在发送之前,Sender线程将消息以 Map<NodeId, Deque> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

生产者参数配置补充

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。