200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > kafka7 探索生产者同步or异步发送消息

kafka7 探索生产者同步or异步发送消息

时间:2021-02-16 01:41:57

相关推荐

kafka7 探索生产者同步or异步发送消息

1.生产者:在发送完消息后,收到回执确认。

主要是在SimpleProducer.java中修改了发送消息的2行代码,用到了回调函数,修改如下:

//发送消息ProducerRecord<String, String> rec = new ProducerRecord<String, String>("test-topic","hello world from win7");producer.send(rec,new Callback() {public void onCompletion(RecordMetadata metadata,Exception exception) {System.out.println("ack!!!");}}); //在发送消息后,收到回执确认。

完整代码如下:

1 package cn.test.mykafka; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.Callback; 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.Producer; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 import org.apache.kafka.clients.producer.RecordMetadata;10 11 /**12 * 简单生产者:在发送完消息后,收到回执确认。13 *14 */15 16 public class SimpleProducer2 {17 18public static void main(String[] args) {19 20//创建配置信息21Properties props = new Properties();22props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的节点和端口23props.put("acks", "all");24props.put("retries", 0);25props.put("batch.size", 16384);26props.put("linger.ms", 1);27props.put("buffer.memory", 33554432);28props.put("key.serializer", "org.mon.serialization.StringSerializer");29props.put("value.serializer", "org.mon.serialization.StringSerializer");30 31//创建一个生产者32Producer<String, String> producer = new KafkaProducer<>(props);3334 //发送消息35ProducerRecord<String, String> rec = new ProducerRecord<String, String>("test-topic","hello world from win7");36producer.send(rec,new Callback() {37 public void onCompletion(RecordMetadata metadata,Exception exception) {38 System.out.println("ack!!!");39 }40}); //在发送消息后,收到回执确认。4142//for (int i = 0; i < 10; i++)43// producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i))); //topic,key(非必填),value 44 45System.out.println("over");46producer.close();47}48 }

SimpleProducer2.java

2.比较同步和异步生产者消息发送速度。

完整代码如下:

1 package cn.test.mykafka; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.Callback; 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.Producer; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 import org.apache.kafka.clients.producer.RecordMetadata;10 11 /**12 * 比较同步和异步生产者消息发送速度,参数未生效,失败13 *14 */15 16 public class SimpleProducer3 {17 18static long starttime;19public static void main(String[] args) {20 21//创建配置信息22Properties props = new Properties();23props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的节点和端口24props.put("acks", "all");25props.put("retries", 0);26props.put("batch.size", 16384);27props.put("linger.ms", 1);28props.put("buffer.memory", 33554432);29props.put("key.serializer", "org.mon.serialization.StringSerializer");30props.put("value.serializer", "org.mon.serialization.StringSerializer");31props.put("producer.type", "async"); //sync为同步,async为异步,此配置未生效3233//创建一个生产者34Producer<String, String> producer = new KafkaProducer<>(props);3536StringBuilder builder = new StringBuilder();37for( int i = 0 ; i < 100000 ; i++) {38 builder.append(" " + i + ",");39}4041 //发送消息42ProducerRecord<String, String> rec = new ProducerRecord<String, String>("test-topic",builder.toString());43producer.send(rec,new Callback() {44 public void onCompletion(RecordMetadata metadata,Exception exception) {45 System.out.println("receive ack : "+ (System.currentTimeMillis()-starttime) + "ms");46 }47}); //在发送消息后,收到回执确认4849starttime = System.currentTimeMillis();50System.out.println("over");51producer.close();52}53 }

SimpleProducer3.java

kafka同步生产者:这个生产者写一条消息的时候,它就立马发送到某个分区去。follower还需要从leader拉取消息到本地,follower再向leader发送确认,leader再向客户端发送确认。由于这一套流程之后,客户端才能得到确认,所以很慢。

kafka异步生产者:这个生产者写一条消息的时候,先是写到某个缓冲区,这个缓冲区里的数据还没写到broker集群里的某个分区的时候,它就返回到client去了。虽然效率快,但是不能保证消息一定被发送出去了。

客户端向topic发送数据分为两种方式:

producer.type=sync 同步模式

producer.type=async 异步模式

执行以上代码(通过控制producer.type参数取值sync/async)时,输出警告:

WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'producer.type' was supplied but isn't a known config.

这说明producer.type参数配置根本没生效,后来我在官方文档中也没有找到这个参数,估计在kafka 2.0.0版本中此参数已经没有了。

于是乎,我在网上找了另一段代码(参考博客),修改后如下:

1 package cn.test.mykafka; 2 3 import java.util.Properties; 4 import java.util.concurrent.Future; 5 6 import org.apache.kafka.clients.producer.Callback; 7 import org.apache.kafka.clients.producer.KafkaProducer; 8 import org.apache.kafka.clients.producer.Producer; 9 import org.apache.kafka.clients.producer.ProducerRecord; 10 import org.apache.kafka.clients.producer.RecordMetadata; 11 12 /** 13 * 比较同步和异步生产者消息发送速度,有2个问题: 1.我不缺定starttime的取值位置是否正确? 2.时间差不多,没得出啥结论 14 * 15 */ 16 17 public class SimpleProducer4 { 18 19static long starttime; 20 21StringBuilder builder = new StringBuilder(); 22 23public void initBuilder() { 24 for (int i = 0; i < 100000; i++) { 25 builder.append(" " + i + ","); 26 } 27} 28 29private Properties kafkaProps = new Properties(); 30 31/** 32* 初始化一些配置信息 33*/ 34public void initProperty() { 35 kafkaProps.put("bootstrap.servers", "192.168.42.133:9092"); // 指定broker的节点和端口 36 kafkaProps.put("acks", "all"); 37 kafkaProps.put("retries", 0); 38 kafkaProps.put("batch.size", 16384); 39 kafkaProps.put("linger.ms", 1); 40 kafkaProps.put("buffer.memory", 33554432); 41 kafkaProps.put("key.serializer", "org.mon.serialization.StringSerializer"); 42 kafkaProps.put("value.serializer", "org.mon.serialization.StringSerializer"); 43} 44 45/** 46* 加载配置信息,生成一个生产者实例 47* 48* @param props 49* @return 50*/ 51public Producer<String, String> getProducer(Properties props) { 52 if (props == null || props.size() == 0) 53 throw new IllegalArgumentException(); 54 return new KafkaProducer<>(props); 55} 56 57/** 58* 同步发送消息 59* 60* @param producer 61* @throws Exception 62*/ 63public void syncSend(Producer<String, String> producer) throws Exception { 64 65 ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-topic", 66 builder.toString() + "this is a sygn record"); 67 68 // 同步发送消息,消息发送成功后,服务端会返回给一个RecordMetadata对象 69 Future<RecordMetadata> future = producer.send(record); 70 starttime = System.currentTimeMillis(); 71 RecordMetadata metadata = future.get(); 72 73 System.out.println("offset:" + metadata.offset() + "\npartition:" + metadata.partition() + "\ntopic:" 74 + metadata.topic() + "\nserializedKeySize:" + metadata.serializedKeySize() + "\nserializedValueSize:" 75 + metadata.serializedValueSize() + "\nreceive sygn ack : " + (System.currentTimeMillis() - starttime) 76 + "ms" + "\n"); 77 78 producer.close(); 79} 80 81/** 82* 异步发送消息 83* 84* @param producer 85*/ 86public void asyncSend(Producer<String, String> producer) { 87 88 ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-topic", 89 builder.toString() + "this is a asygn record"); 90 91 producer.send(record, new Callback() { 92 public void onCompletion(RecordMetadata metadata, Exception e) { 93 System.out.println("offset:" + metadata.offset() + "\npartition:" + metadata.partition() + "\ntopic:" 94+ metadata.topic() + "\nserializedKeySize:" + metadata.serializedKeySize() 95+ "\nserializedValueSize:" + metadata.serializedValueSize() + "\n"); 96 if (e == null) { 97 System.out.println("\nreceive asygn ack : " + (System.currentTimeMillis() - starttime) + "ms"); 98 } 99 }100 });101 starttime = System.currentTimeMillis();102 producer.close();103}104 105public void start() throws Exception {106 initBuilder();107 initProperty();108 // syncSend(getProducer(kafkaProps));109 asyncSend(getProducer(kafkaProps));110 111}112 113public static void main(String[] args) throws Exception {114 115 SimpleProducer4 myProducer = new SimpleProducer4();116 myProducer.start();117}118 }

SimpleProducer4.java

执行之后,发现同步和异执行时间差不多,没能证明异步比同步快。推测原因有2个:

1.消息太短,存在偶然性,看不出时间差;

2.我不确定这段代码是否正确?同步代码是否正确?异步代码是否正确?开始时间取值位置是否正确?

好吧,心好累。关于生产者的同步异步问题就先到这里吧,虽然结果不尽人意,但是过程中也学到了很多,以后有时间再继续吧。

PS:以上纯粹是我的探索测试,如果有不对的地方,欢迎留言指正,不胜感激。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。