200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMq消息队列使用

RocketMq消息队列使用

时间:2020-12-12 13:39:58

相关推荐

RocketMq消息队列使用

最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,

目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景

比kafka还是有过之无不及,其实kafka文档很丰富

但RocketMQ网上的文章太少,找不到相关的操作教程

于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究

下载源码的地址/alibaba/RocketMQ/releases

首选通过在java项目里面Maven依赖方式引用RocketMQ Java SDK

<dependency><groupId>com.alibaba.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>3.2.6</version></dependency>

Downloads

11.3 MBalibaba-rocketmq-3.2.6.tar.gz2.46 MBalibaba-rocketmq-client-java-3.2.6.tar.gzSource code(zip)Source code(tar.gz)

在linux 下用wget 下载源码然后解压出来

在runserver.sh里面可以配置 jvm启动的参数JAVA_OPT_1="-server-Xms4g-Xmx4g-Xmn2g-XX:PermSize=128m-XX:MaxPermSize=320m"

可以 vi runserver.sh

分别给 mqnamesrv mqbroker play.sh 执行的权限

chmod +x mqnamersrv

chmod +x mqbroker

chmod +x play.sh

下面红线框的这段 命令输入错误了,忽略不用看

通过 nohup sh mqnamesrv& 启动 RocketMq

目前没看到结束的命令,也没找到相关的介绍,

我这里用的 ps -ef|grep rocketmq 查到进程pid

然后kill pid号

或则pkill -9 java [慎用]

用jps -v 查看下java进程的参数

rocketmq启动后监听 9876端口,这里还是在看源码里面看到的,资料实在是太少了

在防火墙配置里面加上 9876端口,设置iptables对外开放

部署Broker

nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties &

这里ip换成本机的就是单机实例,如果配置主从 这里可以配其他的ip

Master和Slave的配置文件参考conf目录下的配置文件

Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

部署一Master一Slave,集群采用异步复制方式:

Master:nohupshmqbroker-n"192.168.1.23:9876"-c../conf/2m-2s-async/broker-a.properties&

Slave: nohupshmqbroker-n"192.168.1.23:9876"-c../conf/2m-2s-async/broker-a-s.properties&

package mon.rocketmq;/***************************************************************** 公司名称 :* 系统名称 :信用管家专业版* 类 名 称 :Ios渠道idfa统计,推广统计用* 功能描述 :* 业务描述 :* 作 者 名 :@Author Royal* 开发日期 :-05-15* Created:IntelliJ IDEA**************************************************************** 修改日期 :* 修 改 者 :* 修改内容 :****************************************************************/import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.mon.message.Message;public class Producer {public static void main(String[] args) {DefaultMQProducer producer = new DefaultMQProducer("Producer");producer.setNamesrvAddr("xxxxxxxxxx:9876");try {producer.start();String pushMsg="kafka activeMq rocketMq 消息队列使用1";Message msg = new Message("PushTopic","push","1",pushMsg.getBytes("UTF-8"));SendResult result = producer.send(msg);System.out.println("id:" + result.getMsgId() +" result:" + result.getSendStatus());String pushMsg2="海量级消息记录单机测试2";msg = new Message("PushTopic","push","2",pushMsg2.getBytes("UTF-8"));result = producer.send(msg);System.out.println("id:" + result.getMsgId() +" result:" + result.getSendStatus());String pushMsg3="海量级消息记录单机测试3";msg = new Message("PullTopic","pull","1",pushMsg3.getBytes());result = producer.send(msg);System.out.println("id:" + result.getMsgId() +" result:" + result.getSendStatus());} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}}}

启动生成者

启动消费者

package mon.rocketmq;/***************************************************************** 公司名称 :* 系统名称 :信用管家专业版* 类 名 称 :Ios渠道idfa统计,推广统计用* 功能描述 :* 业务描述 :* 作 者 名 :@Author Royal* 开发日期 :-05-15* Created:IntelliJ IDEA**************************************************************** 修改日期 :* 修 改 者 :* 修改内容 :****************************************************************/import java.io.UnsupportedEncodingException;import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.mon.consumer.ConsumeFromWhere;import com.mon.message.Message;import com.mon.message.MessageExt;public class Consumer {public static void main(String[] args){DefaultMQPushConsumer consumer =new DefaultMQPushConsumer("PushConsumer");consumer.setNamesrvAddr("xxxxxxxxxxxx:9876");try {consumer.subscribe("PushTopic", "push");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext Context) {Message msg = list.get(0);System.out.println(msg.toString());String recString= null;try {recString = new String(msg.getBody() ,"UTF-8");} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println(recString);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();} catch (Exception e) {e.printStackTrace();}}}

以上为单机实例配置

如果你遇到什么问题可以私信我,如果觉得此文对你很有帮助,点下赞推荐下额^_^

参考:/a19881029/article/details/34446629

http://sofar./353572/1540874

/loongshawn/article/details/51086876

RocketMq最佳实践

《RocketMQ原理简介》

分布式开放消息系统(RocketMQ)的原理与实践

《RocketMQ用户指南》

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。