springboot整合rabbitMQ

寻技术 JAVA编程 / 其他编程 2023年07月11日 71

1.生产者工程

  • pom.xml里引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • application.yml里配置基本信息

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: ******
        password: ******
        virtual-host: /test
    
  • 在配置类里创建交换机,队列,绑定交换机和队列

    package com.min.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
    
        // 创建交换机
        @Bean("topicExchange")
        public Exchange exchange() {
            return ExchangeBuilder.topicExchange("springboot-topic-exchange").durable(true).build();
        }
    
        // 创建队列
        @Bean("queue1")
        public Queue queue1() {
            return QueueBuilder.durable("springboot-queue1").build();
        }
    
        @Bean("queue2")
        public Queue queue2() {
            return QueueBuilder.durable("springboot-queue2").build();
        }
    
        // 绑定队列和交换机
        @Bean
        public Binding BindQueue1Exchange(@Qualifier("queue1") Queue queue,
                                          @Qualifier("topicExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("#.error").noargs();
        }
    
        @Bean
        public Binding BindQueue2Exchange(@Qualifier("queue2") Queue queue,
                                          @Qualifier("topicExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("order.*").noargs();
        }
    
    }
    
  • 注入RabbitTemplate发消息

    package com.min;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class SpringbootRabbitmqProducerApplicationTests {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        void testSend1() {
            rabbitTemplate.convertAndSend("springboot-topic-exchange","boot.error","boot.error的key发送的消息");
        }
    
        @Test
        void testSend2() {
            rabbitTemplate.convertAndSend("springboot-topic-exchange","order.error","order.error的key发送的消息");
        }
    
        @Test
        void testSend3() {
            rabbitTemplate.convertAndSend("springboot-topic-exchange","order.insert","order.insert的key发送的消息");
        }
    
    }
    

2.消费者工程

  • 引入依赖并且配置基本信息,和生产者一样

  • 创建两个监听器,分别监听两个队列

    package com.min.listener;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitMQListener1 implements MessageListener {
    
        @Override
        @RabbitListener(queues = "springboot-queue1")
        public void onMessage(Message message) {
            System.out.println("springboot-queue1接收到消息:" + new String(message.getBody()));
        }
    }
    
    package com.min.listener;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitMQListener2 implements MessageListener {
    
        @Override
        @RabbitListener(queues = "springboot-queue2")
        public void onMessage(Message message) {
            System.out.println("springboot-queue2接收到消息:" + new String(message.getBody()));
        }
    }
    
  • 启动消费者工程,然后分别运行生产者工程里的三个测试方法,结果如下

    springboot-queue1接收到消息:boot.error的key发送的消息
    
    springboot-queue1接收到消息:order.error的key发送的消息
    springboot-queue2接收到消息:order.error的key发送的消息
    
    springboot-queue2接收到消息:order.insert的key发送的消息
    

    从以上结果可以看出,routingkey为boot.error的消息只能被springboot-queue1接收到,routingkey为order.insert的消息只能被springboot-queue2接收到,而routingkey为order.error的消息两个队列都能接收到。(注意:本案例使用的是rabbitmq的topic工作模式)

关闭

用微信“扫一扫”