200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > Kafka 生产者消息发送流程

Kafka 生产者消息发送流程

时间:2021-05-08 04:01:11

相关推荐

Kafka 生产者消息发送流程

1. 数据生产流程解析

Producer创建时,会创建一个Sender线程并设置为守护线程。生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。落盘到broker成功,返回生产元数据给生产者。元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。

2. 配置参数:

配置使用方式:

Map<String, Object> configs = new HashMap();configs.put("bootstrap.servers", "node1:9092");configs.put("key.serializer","org.mon.serialization.StringSerializer");configs.put("value.serializer", "org.mon.serialization.StringSerializer");configs.put("acks", "all");configs.put("compression", "gzip");configs.put("retries", 3);//key可以使用ProducerConfig常量// configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

*配置参数:

3. 自定义序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。

序列化器的作用就是用于序列化要发送的消息的。

Kafka使用 org.mon.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数组。

要序列化的实体类

import lombok.Data;/*** 用户自定义的封装消息的实体类*/@Datapublic class User {private Integer userId;private String username;}

自定义序列化方器

public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing// 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的}@Overridepublic byte[] serialize(String topic, User data) {try {if (data == null) {return null;} else {final Integer userId = data.getUserId();final String username = data.getUsername();if (userId != null) {if (username != null) {final byte[] bytes = username.getBytes("UTF-8");int length = bytes.length;// 第一个4个字节用于存储userId的值// 第二个4个字节用于存储username字节数组的长度int值// 第三个长度,用于存放username序列化之后的字节数组ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);// 设置userIdbuffer.putInt(userId);// 设置username字节数组长度buffer.putInt(length);// 设置username字节数组buffer.put(bytes);// 以字节数组形式返回user对象的值return buffer.array();}}}} catch (Exception e) {throw new SerializationException("数据序列化失败");}return null;}@Overridepublic void close() {// do nothing// 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。}}

使用自定义序列化器发送消息

public class MyProducer {public static void main(String[] args) {Map<String, Object> configs = new HashMap();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 设置自定义的序列化器configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);User user = new User();user.setUserId(400);user.setUsername("赵四");ProducerRecord<String, User> record = new ProducerRecord<String, User>("tp_user_01", // topicuser.getUsername(), // keyuser // value);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.out.println("消息发送异常");} else {System.out.println("主题:" + metadata.topic() + "\t"+ "分区:" + metadata.partition() + "\t"+ "生产者偏移量:" + metadata.offset());}}});// 关闭生产者producer.close();}}

4. 自定义分区器

默认(DefaultPartitioner)分区计算:

如果record提供了分区号,则使用record提供的分区号如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。

- 会首先在可用的分区中分配分区号

- 如果没有可用的分区,则在该主题所有分区中分配分区号。

如果要自定义分区器,则需要

首先开发Partitioner接口的实现类在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xx.Xxx.class”)

自定义分区器

/*** 自定义分区器*/public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此处可以计算分区的数字。// 我们直接指定为2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

使用自定义分区器发送消息

public class MyProducer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 指定自定义的分区器configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);// 此处不要设置partition的值ProducerRecord<String, String> record = new ProducerRecord<String, String>("tp_part_01","mykey","myvalue");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.out.println("消息发送失败");} else {System.out.println(metadata.topic());System.out.println(metadata.partition());System.out.println(metadata.offset());}}});// 关闭生产者producer.close();}}

5. 自定义拦截器

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。

对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消 息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程 中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任 何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送 失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发 送效率。close:关闭Interceptor,主要用于执行一些资源清理工作。

如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。 另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个 Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

自定义拦截器:

实现ProducerInterceptor接口在KafkaProducer的设置中设置自定义的拦截器

自定义拦截器

public class InterceptorOne implements ProducerInterceptor<Integer, String> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);@Overridepublic ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {System.out.println("拦截器1 -- go");// 消息发送的时候,经过拦截器,调用该方法// 要发送的消息内容final String topic = record.topic();final Integer partition = record.partition();final Integer key = record.key();final String value = record.value();final Long timestamp = record.timestamp();final Headers headers = record.headers();// 拦截器拦下来之后根据原来消息创建的新的消息// 此处对原消息没有做任何改动ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic,partition,timestamp,key,value,headers);// 传递新的消息return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器1 -- back");// 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务// 会影响kafka生产者的性能。}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {final Object classContent = configs.get("classContent");System.out.println(classContent);}}

public class InterceptorTwo implements ProducerInterceptor<Integer, String> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {System.out.println("拦截器2 -- go");// 消息发送的时候,经过拦截器,调用该方法// 要发送的消息内容final String topic = record.topic();final Integer partition = record.partition();final Integer key = record.key();final String value = record.value();final Long timestamp = record.timestamp();final Headers headers = record.headers();// 拦截器拦下来之后根据原来消息创建的新的消息// 此处对原消息没有做任何改动ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic,partition,timestamp,key,value,headers);// 传递新的消息return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器2 -- back");// 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务// 会影响kafka生产者的性能。}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {final Object classContent = configs.get("classContent");System.out.println(classContent);}}

public class MyProducer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应// 此时可以保证发送消息即使在重试的情况下也是有序的。configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);// configs.put("max.in.flight.requests.per.connection", 1);// interceptor.classes// 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.lagou.kafka.demo.interceptor.InterceptorOne," +"com.lagou.kafka.demo.interceptor.InterceptorTwo");configs.put("classContent", "this is lagou's kafka class");KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("tp_inter_01",0,1001,"this is lagou's 1001 message");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息发送成功:" + metadata.topic() + " " + metadata.offset());}}});// 关闭生产者producer.close();}}

运行结果

拦截器1 -- go拦截器2 -- go拦截器1 -- back拦截器2 -- back消息发送成功: tp_inter_01 1

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。