SpringBoot实战 (八)| RocketMq安装与整合使用

Scroll Down

前言

这篇开始给大家带来的是消息中间件模块,分别给大家带来RocketMq,其他中间件请看kafkaraabbtMq文章

一、安装

1.1、安装环境说明

  • Linux/Unix/Mac
  • 64bit JDK 1.8+ (必须)
  • maven
  • rocketmq-all-4.8.0-bin-release

大家如果发现失败了,可能是你的jdk没配置好,别以为能启动java程序就是好的,具体环境变量请转到这篇文章

1.2、下载源码

官网下载
Git下载
87355B11B11549F38868F4CC727315C0.png

1.3、解压

找到一个你喜欢的目录解压

unzip rocketmq-all-4.7.1-source-release.zip 

1.3、编译(可省略)

官网说需要使用maven编译整个项目

mvn -Prelease-all -DskipTests clean install -U 

项目部署完成后,可以进入 cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1 目录查看

1.4、修改runserver.sh和runbroker.sh文件

因为rocketMQ默认的启动参数内存占用非常大,如果环境没有这么多内存就必需修改JAVA_OPT参数

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

1.5、启动及查看日志

# Name Server
nohup sh bin/mqnamesrv &
# broker
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

查看运行日志:tail -f ~/logs/rocketmqlogs/namesrv.log
通过-c参数指定配置文件
查看运行日志:tail -f ~/logs/rocketmqlogs/broker.log

1.6、看到 “ The Name Server boot success. serializeType=JSON ” 表示启动成功

失败一般就是内存问题,修改启动脚本的内存大小

1.7、测试

出现以下就代表成功
image.png

1.8、停止

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

二、安装可视化管理界面

mvn clean package -Dmaven.test.skip=true
  • 运行
nohup java -jar \
    -Drocketmq.config.namesrvAddr=192.168.28.130:9876 \
    -Drocketmq.config.isVIPChannel=false \
    rocketmq-console-ng-1.0.0.jar &

三、SpringBoot整合RocketMq

  • SpringBoot2.3.4
  • rocketmq4.8.0
  • JDK1.8 +Lombok(插件)
  • grade

3.1、依赖

compile("org.apache.rocketmq:rocketmq-spring-boot-starter:2.0.3")
compile("com.alibaba:fastjson:1.2.44")

我这里采用的是生产者和请求端分离

3.2、生产者

  • 配置文件
# 开发环境配置
server:
  # 服务端口
  port: 8008

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: my-group
    send-message-timeout: 300000
    compress-message-body-threshold: 4096
    max-message-size: 4194304
    retry-next-server: true
    retry-times-when-send-failed: 2
  • 接口测试
@RestController
public class TestController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("test1")
    public String test1(){
        String name = "aaa";
        OrderPaidEvent order = new OrderPaidEvent("1111","22222");
        rocketMQTemplate.convertAndSend("test-topic-1", name);
        System.err.println("生产者给主题:test-topic-1,发送成功...。消息体:"+ name);

        rocketMQTemplate.send("test-topic-2", MessageBuilder.withPayload(order).build());
        System.err.println("生产者给主题:test-topic-2,发送成功...。消息体:"+ JSON.toJSONString(order));
        return "成功";
    }
}
  • bean形式传递
public class OrderPaidEvent {
    private String name;

    private String password;

    public OrderPaidEvent(String name, String password) {
        this.name = name;
        this.password = password;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

3.3、消费者

  • 配置文件
# 开发环境配置
server:
  # 服务端口
  port: 8009

rocketmq:
  name-server: 127.0.0.1:9876

我这里用两种来接收

  • 主题一
@Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
public class OrderPaidEventConsumer implements RocketMQListener<OrderPaidEvent> {

    Logger logger = LoggerFactory.getLogger(OrderPaidEventConsumer.class);

    @Override
    public void onMessage(OrderPaidEvent message) {
        logger.error("------- 消费者,主题:test-topic-2 received:{}", message);
    }

}
  • 主题二
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class OrderPaidEventOneConsumer implements RocketMQListener<String> {

    Logger logger = LoggerFactory.getLogger(OrderPaidEventOneConsumer.class);

    @Override
    public void onMessage(String message) {
        logger.error("------- 消费者,主题:test-topic-1 received:{}", message);
    }

}