什么是rocketmq?主从如何配置?

什么是rocketmq?

RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:
1、支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
2、在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
3、支持拉(pull)和推(push)两种消息模式
4、单一队列百万消息的堆积能力
5、支持多种消息协议,如 JMS、MQTT 等
6、分布式高可用的部署架构,满足至少一次消息传递语义
7、提供 docker 镜像用于隔离测试和云集群部署
8、提供配置、指标和监控等功能丰富的 Dashboard

名词解释:
Producer:消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。
Producer Group:生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。
Consumer:消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。
Consumer Group:消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
Topic:Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
Message:Message 是消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。
Tag:标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
Broker:Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
Name Server:Name Server 为 producer 和 consumer 提供路由信息。

RocketMQ集群部署结构:

rocketmq简单使用:

1、环境介绍:
centos 7.5
Java 1.8
rocketmq 3.5.8

2、配置使用:

下载下面的gz包,直接解压就可以使用:
# wget http://mirror.bit.edu.cn/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip

3、启动服务:
# tar zxf rockermq.tar.gz -C /usr/local
# cd /usr/local/rocketmq/bin
# nohup sh mqnamesrv &
# nohup sh mqbroker -n localhost:9876 &

4、关闭服务:
# cd /usr/local/rocketmq/bin
# sh mqshutdown broker
# sh mqshutdown namesrv

rocketmq的主从如何配置?

rocketmq的主从配置分为三种:多Master模式, 多Master多Slave模式,同步双写;多Master多Slave模式,异步复制;从rocketmq的conf目录中可以做区分:

在conf目录中可以看到以下三个目录

说明:
2m-noslave: 多Master模式
2m-2s-sync: 多Master多Slave模式,同步双写
2m-2s-async:多Master多Slave模式,异步复制

配置过程基本都类似,下文只介绍多Master多Slave模式,同步双写的方式:

1、环境介绍:
centos 7.5
Java 1.8
rocketmq 3.5.8

10.0.0.1 name1
10.0.0.2 name2
10.0.0.3 name3
10.0.0.4 name4

Linux Java 环境配置这里不在赘述了,相信大家都会配置的

2、配置详情
进入name1机器
name1:

创建数据目录:
mkdir /data
下载rocketmq
#cd /usr/local
# wget http://mirror.bit.edu.cn/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
#unzip rocketmq-all-4.3.2-bin-release.zip
#mv rocketmq-all-4.3.2-bin-release rocketmq
进入目录编辑配置文件
#cd /usr/local/rocketmq/conf
#vim /usr/local/rocketmq/conf/2m-2s-sync/broker-master1.properties
brokerClusterName=TEST1
brokerName=broker-master1
brokerId=0
namesrvAddr=name1:9876;name2:9876;name3:9876;name4:9876
brokerIP1=name1
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=128
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88

storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000

checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128

brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

启动服务:
#cd /usr/local/rocketmq/bin
#nohup sh mqnamesrv &
#nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-master1.properties autoCreateTopicEnable=true > /dev/null 2>&1 &

name2:
创建数据目录:
mkdir /data
下载rocketmq
#cd /usr/local
# wget http://mirror.bit.edu.cn/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
#unzip rocketmq-all-4.3.2-bin-release.zip
#mv rocketmq-all-4.3.2-bin-release rocketmq

进入目录编辑配置文件
#cd /usr/local/rocketmq/conf
#vim /usr/local/rocketmq/conf/2m-2s-sync/broker-master2.properties
brokerClusterName=TEST1
brokerName=broker-master2
brokerId=0
namesrvAddr=name1:9876;name2:9876;name3:9876;name4:9876
brokerIP1=name2
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=128
#fileReservedTime=48
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88

storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=1
flushConsumeQueueLeastPages=1
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000

checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128

brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

启动服务:
#cd /usr/local/rocketmq/bin
#nohup sh mqnamesrv &
#nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-master2.properties autoCreateTopicEnable=true > /dev/null 2>&1 &

name3:
创建数据目录:
mkdir /data
下载rocketmq
#cd /usr/local
# wget http://mirror.bit.edu.cn/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
#unzip rocketmq-all-4.3.2-bin-release.zip
#mv rocketmq-all-4.3.2-bin-release rocketmq

进入目录编辑配置文件
#cd /usr/local/rocketmq/conf
#vim /usr/local/rocketmq/conf/2m-2s-sync/broker-slave1.properties
brokerClusterName=TEST1
brokerName=broker-master1
brokerId=1
namesrvAddr=name1:9876;name2:9876;name3:9876;name4:9876
brokerIP1=name3
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88

storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000

checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128

brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

启动服务:
#cd /usr/local/rocketmq/bin
#nohup sh mqnamesrv &
#nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-slave1.properties autoCreateTopicEnable=true > /dev/null 2>&1 &

name4:
创建数据目录:
mkdir /data
下载rocketmq
#cd /usr/local
# wget http://mirror.bit.edu.cn/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
#unzip rocketmq-all-4.3.2-bin-release.zip
#mv rocketmq-all-4.3.2-bin-release rocketmq

进入目录编辑配置文件
#cd /usr/local/rocketmq/conf
#vim /usr/local/rocketmq/conf/2m-2s-sync/broker-slave2.properties
brokerClusterName=TEST1
brokerName=broker-master2
brokerId=1
namesrvAddr=name1:9876;name2:9876;name3:9876;name4:9876
brokerIP1=name4
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88

storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000

checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128

brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

启动服务:
#cd /usr/local/rocketmq/bin
#nohup sh mqnamesrv &
#nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-slave2.properties autoCreateTopicEnable=true > /dev/null 2>&1 &

执行上面命令后就完成了rocketmq集群的部署。

配置文件详解:

所属集群名字
附加:如果有多个master,那么每个master配置的名字应该一样,要不然识别不了对方,不知道是一个集群内部的
brokerClusterName=rocketmq-cluster
此处需手动更改
broker名字,注意此处不同的配置文件填写的不一样
附加:按配置文件文件名来匹配
brokerName=broker-a
0 表示Master, > 0 表示slave
brokerId=0
此处许手动更改
(此处nameserver跟host配置相匹配,9876为默认rk服务默认端口)nameServer 地址,分号分割
附加:broker启动时会跟nameserver建一个长连接,broker通过长连接才会向nameserver发新建的topic主题,然后java的客户端才能跟nameserver端发起长连接,向nameserver索取topic,找到topic主题之后,判断其所属的broker,建立长连接进行通讯,这是一个至关重要的路由的概念,重点,也是区别于其它版本的一个重要特性
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
Broker 对外服务的监听端口
listenPort=10911
删除文件时间点,默认是凌晨4点
deleteWhen=04
文件保留时间,默认48小时
fileReservedTime=120
commitLog每个文件的大小默认1G
附加:消息实际存储位置,和ConsumeQueue是mq的核心存储概念,之前搭建2m环境的时候创建在store下面,用于数据存储,consumequeue是一个逻辑的概念,消息过来之后,consumequeue并不是把消息所有保存起来,而是记录一个数据的位置,记录好之后再把消息存到commitlog文件里
mapedFileSizeCommitLog=1073741824
ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
存储路径
storePathRootDir=/usr/local/rocketmq/store
commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000

Broker 的角色
ASYNC_MASTER 异步复制Master
SYNC_MASTER 同步双写Master
SLAVE
brokerRote=ASYNC_MASTER

刷盘方式
ASYNC_FLUSH 异步刷盘
SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

checkTransactionMessageEnable=false
发消息线程池数量
sendMessageTreadPoolNums=128
拉消息线程池数量
pullMessageTreadPoolNums=128

参考链接:https://www.jianshu.com/p/6a6b89f7365a

3、常见命令

1. 登录控制台: 首先进入 RocketMQ 工程,进入/RocketMQ/bin 在该目录下有个 mqadmin 脚本 .
查看帮助: 在 mqadmin 下可以查看有哪些命令
a: 查看具体命令的使用 : sh mqadmin
b: sh mqadmin help 命令名称
例如,查看 updateTopic 的使用
sh mqadmin help updateTopic
2. 关闭nameserver和所有的broker:
进入到bin下:
sh mqshutdown namesrv
sh mqshutdown broker
3. 查看所有消费组group:
sh mqadmin consumerProgress -n 192.168.23.159:9876
4. 查看指定消费组下的所有topic数据堆积情况:
sh mqadmin consumerProgress -n 192.168.23.159:9876 -g ywdGroupConsumer
5. 查看所有topic :
sh mqadmin topicList -n 192.168.23.159:9876
6. 查看topic信息列表详情统计
sh mqadmin topicstatus -n 192.168.23.159:9876 -t myTopicTest1
7. 新增topic
sh mqadmin updateTopic –n 10.45.47.168 –c DefaultCluster –t ZTEExample
8. 删除topic
sh mqadmin deleteTopic –n 10.45.47.168:9876 –c DefaultCluster –t ZTEExample

更详细的可以看这个文档:https://blog.csdn.net/zhu_tianwei/article/details/40951301

4、rocketmq监控工具

所需环境:java,mvn
机器IP:10.0.0.5

在此处下载rocketmq-console,https://github.com/apache/rocketmq-externals

下载后进入目录内执行命令:
#mvn clean package -Dmaven.test.skip=true

执行后会在target目录下看到rocketmq-console-ng-1.0.0.jar文件,执行命令启动rocketmq-console
#java -jar rocketmq-console-ng-1.0.0.jar –server.port=8888 –rocketmq.config.namesrvAddr=name1:9876

在浏览器访问:http://10.0.0.5:8888查看结果

5、配置中常见的问题汇总

参考:
http://www.leexide.com/post/RocketMQ%E9%9B%86%E7%BE%A4%E9%83%A8%E7%BD%B2%E6%96%B9%E5%BC%8F%E6%80%BB%E7%BB%93
https://www.jianshu.com/p/824066d70da8

6、在使用中的问题:
在代码中如何做到主从分离,提高mq的利用率?

麻烦知道上面问题的请指导,感谢;如上面的文档有问题的也可直接联系交流纠错,感谢
1421519484@qq.com

发表评论