前言
这篇开始给大家带来的是消息中间件模块,分别给大家带来RocketMq,其他中间件请看kafka、raabbtMq文章
一、安装
1.1、安装环境说明
- Linux/Unix/Mac
- 64bit JDK 1.8+ (必须)
- maven
- rocketmq-all-4.8.0-bin-release
大家如果发现失败了,可能是你的jdk没配置好,别以为能启动java程序就是好的,具体环境变量请转到这篇文章
1.2、下载源码
1.3、解压
找到一个你喜欢的目录解压
unzip rocketmq-all-4.7.1-source-release.zip
1.3、编译(可省略)
官网说需要使用maven编译整个项目
mvn -Prelease-all -DskipTests clean install -U
项目部署完成后,可以进入 cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1 目录查看
1.4、修改runserver.sh和runbroker.sh文件
因为rocketMQ默认的启动参数内存占用非常大,如果环境没有这么多内存就必需修改JAVA_OPT参数
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
1.5、启动及查看日志
# Name Server
nohup sh bin/mqnamesrv &
# broker
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
查看运行日志:tail -f ~/logs/rocketmqlogs/namesrv.log
通过-c参数指定配置文件
查看运行日志:tail -f ~/logs/rocketmqlogs/broker.log
1.6、看到 “ The Name Server boot success. serializeType=JSON ” 表示启动成功
失败一般就是内存问题,修改启动脚本的内存大小
1.7、测试
出现以下就代表成功
1.8、停止
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
二、安装可视化管理界面
- 下载
- 编译
mvn clean package -Dmaven.test.skip=true
- 运行
nohup java -jar \
-Drocketmq.config.namesrvAddr=192.168.28.130:9876 \
-Drocketmq.config.isVIPChannel=false \
rocketmq-console-ng-1.0.0.jar &
三、SpringBoot整合RocketMq
- SpringBoot2.3.4
- rocketmq4.8.0
- JDK1.8 +Lombok(插件)
- grade
3.1、依赖
compile("org.apache.rocketmq:rocketmq-spring-boot-starter:2.0.3")
compile("com.alibaba:fastjson:1.2.44")
我这里采用的是生产者和请求端分离
3.2、生产者
- 配置文件
# 开发环境配置
server:
# 服务端口
port: 8008
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: my-group
send-message-timeout: 300000
compress-message-body-threshold: 4096
max-message-size: 4194304
retry-next-server: true
retry-times-when-send-failed: 2
- 接口测试
@RestController
public class TestController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("test1")
public String test1(){
String name = "aaa";
OrderPaidEvent order = new OrderPaidEvent("1111","22222");
rocketMQTemplate.convertAndSend("test-topic-1", name);
System.err.println("生产者给主题:test-topic-1,发送成功...。消息体:"+ name);
rocketMQTemplate.send("test-topic-2", MessageBuilder.withPayload(order).build());
System.err.println("生产者给主题:test-topic-2,发送成功...。消息体:"+ JSON.toJSONString(order));
return "成功";
}
}
- bean形式传递
public class OrderPaidEvent {
private String name;
private String password;
public OrderPaidEvent(String name, String password) {
this.name = name;
this.password = password;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
3.3、消费者
- 配置文件
# 开发环境配置
server:
# 服务端口
port: 8009
rocketmq:
name-server: 127.0.0.1:9876
我这里用两种来接收
- 主题一
@Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
public class OrderPaidEventConsumer implements RocketMQListener<OrderPaidEvent> {
Logger logger = LoggerFactory.getLogger(OrderPaidEventConsumer.class);
@Override
public void onMessage(OrderPaidEvent message) {
logger.error("------- 消费者,主题:test-topic-2 received:{}", message);
}
}
- 主题二
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class OrderPaidEventOneConsumer implements RocketMQListener<String> {
Logger logger = LoggerFactory.getLogger(OrderPaidEventOneConsumer.class);
@Override
public void onMessage(String message) {
logger.error("------- 消费者,主题:test-topic-1 received:{}", message);
}
}