RabbitMQ 多源配置

在本地安装RabbitMQ服务或者任意个人服务器安装server

首先查看服务器版本-根据版本下载对应的MQ

lsb_release -a

查看对应服务器版本MQ以及Erlang OTP/

RabbitMQ Erlang Version Requirements

以Centos7为例

  • 系统环境

    • JDK1.8
    • Centos7-64
    • Erlang-OTP 23
    • RabbitMQ-3.8.5

安装Erlang

  • 通过rpm 安装Erlang

    curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
    
  • 安装Erlang

    yum install -y erlang
    
  • 查看erl版本号

    erl
    

  • Erlang 安装完成

安装RabbitMQ

  • 导入key
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
rpm --import https://packagecloud.io/gpg.key
  • 设置RabbitMQ 前置条件
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
  • 下载RabbitMQ
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
  • 访问链接下载后,将rpm包上传至服务器-导入key
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
  • 安装socat
yum -y install epel-release
yum -y install socat
  • 安装RabbitMQ rpm 文件
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
  • 启用管理平台插件,启用插件后,可以可视化管理RabbitMQ
rabbitmq-plugins enable rabbitmq_management
  • 启动MQ
systemctl start rabbitmq-server
  • 访问控制图形化界面 -> IP:15672

  • MQ 默认账号guest 密码 guest
  • 创建专属账号进行赋权使用-> 账号:admin 密码:admin
rabbitmqctl add_user admin admin
  • 设置admin为超级管理员
rabbitmqctl set_user_tags admin administrator
  • 授权远程访问(也可以登录后,可视化配置)
rabbitmqctl set_permissions -p / admin "." "." ".*"
  • 创建完成后,重启RabbitMQ
systemctl restart rabbitmq-server

代码配置

引入依赖

<!-- rabbit mq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.10.RELEASE</version>
</dependency>

application.yml 配置

spring:
  port: 8088
  rabbitmq:
    first:
      host: 42.156.222.164
      port: 5672
      username: admin
      password: admin
        #消费端配置
      listener:
        simple:
          concurrency: 10  #消费端
          max-concurrency: 20 #最大消费端数
          acknowledge-mode: auto #自动签收auto  手动 manual
          prefetch: 1 #限流(海量数据,同时只能过来一条)
    second:
      host: localhost
      port: 5672
      username: admin
      password: admin

程序启动类

package com.study;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class MqApplication {

    public static void main(String[] args) {
        SpringApplication.run(MqApplication.class, args);
    }
}

RabbitMQ 配置类

package com.study.mq.rabbitmqConfig;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class RabbitConfig {

    @Bean(name = "firstConnectionFactory")
    @Primary
    public ConnectionFactory firstConnectionFactory(
            @Value("${spring.rabbitmq.first.host}") String host,
            @Value("${spring.rabbitmq.first.port}") int port,
            @Value("${spring.rabbitmq.first.username}") String username,
            @Value("${spring.rabbitmq.first.password}") String password
    ) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean(name = "secondConnectionFactory")
    public ConnectionFactory secondConnectionFactory(
            @Value("${spring.rabbitmq.second.host}") String host,
            @Value("${spring.rabbitmq.second.port}") int port,
            @Value("${spring.rabbitmq.second.username}") String username,
            @Value("${spring.rabbitmq.second.password}") String password
    ) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean(name = "firstRabbitTemplate")
    @Primary
    public RabbitTemplate firstRabbitTemplate(
            @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
    ) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean(name = "secondRabbitTemplate")
    public RabbitTemplate secondRabbitTemplate(
            @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
    ) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean(name = "firstFactory")
    public SimpleRabbitListenerContainerFactory firstFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "secondFactory")
    public SimpleRabbitListenerContainerFactory secondFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

消费者1

package com.study.mq.Receiver;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.study.mq.web.bean.entity.MqMsg;
import com.study.mq.web.mapper.MqMsgMapper;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


/**
 * RabbitMQ中的消费者,接收first RabbitMQ中的队列first的数据
 */
@Component
public class Receiver {

    @Resource
    private MqMsgMapper mapper;

    @RabbitListener(queues = "first", containerFactory = "firstFactory")
    @RabbitHandler
    public void process(String msg) {
        final JSONObject object = JSONUtil.parseObj(msg);
        final String context = object.getStr("context");
        System.out.println("Receiver : " + context);
        final MqMsg mqMsg = new MqMsg();
        mqMsg.setContext(context);
        mapper.insert(mqMsg);
    }
}

消费者2

package com.study.mq.Receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * RabbitMQ中的消费者,接收second RabbitMQ中的队列second的数据
 */
@Component
public class Receiver2 {

    @RabbitListener(queues = "second", containerFactory = "secondFactory")
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Receiver : " + msg);
    }
}

生产者1

package com.study.mq.sender;

import java.util.Date;
import java.util.HashMap;
import javax.annotation.Resource;

import cn.hutool.core.map.MapUtil;
import cn.hutool.json.JSONUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;


/**
 * RabbitMQ中的生产者,发送消息到RabbitMQ中first队列
 */
@Component
public class FirstSender {

    @Resource(name="firstRabbitTemplate")
    private RabbitTemplate firstRabbitTemplate;

    public void send1() {
        final HashMap<Object, Object> map = MapUtil.newHashMap();
        map.put("context", "一条消息");
        final String msg = JSONUtil.toJsonStr(map);
        this.firstRabbitTemplate.convertAndSend("first","firstDirectRouting", msg);
    }
}

生产者2

package com.study.mq.sender;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;


/**
 * RabbitMQ中的生产者,发送消息到RabbitMQ中的second队列
 */
//@Component
@RestController
public class SecondSender {

    @Resource(name = "secondRabbitTemplate")
    private RabbitTemplate secondRabbitTemplate;

    public void send1() {
        String context = "第一次发送 " + new Date();
        System.out.println("Sender : " + context);
        this.secondRabbitTemplate.convertAndSend("second","secondRoutingKey", context);
    }
}

程序测试

package com.study.mq;

import com.study.mq.sender.FirstSender;
import com.study.mq.sender.SecondSender;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class TestDemo {

    @Resource
    private FirstSender firstSender;

    @Resource
    private SecondSender secondSender;


    @GetMapping("/firstSend")
    public void firstSend() throws Exception {
        firstSender.send1();
    }

    @GetMapping("/secondSend")
    public void secondSend() throws Exception {
        secondSender.send1();
    }
}
  • 启动项目, 调用 /firstSend 接口向第一个first 源发送一条消息

  • 测试项目时先注释消费者 @RabbitListener 监听注解,这样发送消息后不至于被马上消费
  • 消息发送成功后,再放开@RabbitListener注释,重新启动项目可以查看到消息已被消费
上一篇 下一篇