使用死信队列(Dead Letter Exchange, DLX)实现延迟队列功能

news/2024/9/20 7:37:07 标签: 开发语言, 后端, rabbitmq

在 Spring Boot 项目中整合 RabbitMQ 并使用死信队列(Dead Letter Exchange, DLX)实现延迟队列功能,可以通过配置 RabbitMQ 的交换机、队列和绑定关系来实现。以下是一个完整的示例,展示如何在 Spring Boot 项目中使用死信队列实现延迟队列。

1. 添加依赖

在 pom.xml 文件中添加 RabbitMQ 的依赖:

xml

复制

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

运行 HTML

2. 配置 RabbitMQ

在 application.yml 文件中配置 RabbitMQ 的连接信息:

yaml

复制

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

3. 创建 RabbitMQ 配置类

创建一个配置类,用于定义交换机、队列和绑定关系:

java

复制

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    public static final String DELAY_QUEUE_NAME = "delay.queue";
    public static final String DLX_EXCHANGE_NAME = "dlx.exchange";
    public static final String DLX_QUEUE_NAME = "dlx.queue";

    // 创建死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE_NAME);
    }

    // 创建死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue(DLX_QUEUE_NAME);
    }

    // 绑定死信队列到死信交换机
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("");
    }

    // 创建延迟交换机
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 创建延迟队列,并设置死信交换机
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        return new Queue(DELAY_QUEUE_NAME, false, false, false, args);
    }

    // 绑定延迟队列到延迟交换机
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("");
    }
}

4. 发送延迟消息

创建一个消息发送服务,用于发送延迟消息:

java

复制

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendDelayMessage(String message, long delayTime) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, "", message, msg -> {
            msg.getMessageProperties().setExpiration(String.valueOf(delayTime));
            return msg;
        });
        System.out.println(" [x] Sent '" + message + "' with delay " + delayTime + " ms");
    }
}

5. 消费延迟消息

创建一个消息消费者,用于消费延迟消息:

java

复制

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQReceiver {

    @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE_NAME)
    public void receiveDelayMessage(String message) {
        System.out.println(" [x] Received '" + message + "'");
    }
}

6. 测试延迟队列

创建一个测试类,用于测试延迟队列的功能:

java

复制

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQTest implements CommandLineRunner {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @Override
    public void run(String... args) throws Exception {
        String message = "Hello, RabbitMQ Delay Queue!";
        long delayTime = 5000; // 延迟 5 秒
        rabbitMQSender.sendDelayMessage(message, delayTime);
    }
}

7. 运行项目

启动 Spring Boot 项目,观察控制台输出,可以看到消息在延迟 5 秒后被消费。

总结

  • 配置 RabbitMQ:在 application.yml 文件中配置 RabbitMQ 的连接信息。

  • 创建 RabbitMQ 配置类:定义交换机、队列和绑定关系,设置死信交换机。

  • 发送延迟消息:通过 AmqpTemplate 发送带有 TTL 的消息。

  • 消费延迟消息:通过 @RabbitListener 注解消费死信队列中的消息。

通过这种方式,可以在 Spring Boot 项目中使用死信队列实现延迟队列功能。需要注意的是,消息的 TTL 是以毫秒为单位的,且消息的延迟时间不能超过 RabbitMQ 的最大消息大小限制。

上面代码没看懂的,看这里,以下是详细的流程:

  1. 发送延迟消息

    • 生产者将消息发送到延迟交换机(delay.exchange),并设置消息的 TTL。

    • 消息被路由到延迟队列(delay.queue)。

  2. 延迟队列处理

    • 延迟队列没有消费者,消息会在队列中等待,直到 TTL 到期。

    • TTL 到期后,消息被转发到死信交换机(dlx.exchange)。

  3. 死信队列处理

    • 死信交换机将消息路由到死信队列(dlx.queue)。

    • 死信队列的消费者消费这些消息。


http://www.niftyadmin.cn/n/5666823.html

相关文章

tensorflow-dataset 内网下载 指定目录

内网下载报错 解决办法是设置环境变量&#xff0c;指向你的代理服务器TFDS_HTTP_PROXYhttp://xxx、TFDS_HTTPS_PROXYhttp://xxx。 留意到&#xff0c;赋值的是你的代理服务器&#xff0c;且最好协议都使用http(即使TFDS_HTTPS_PROXY也要使用http协议连服务器)。如果不这么做&a…

zookeeper向管控平台上报状态

问题 在你的场景中&#xff0c;由于 Django 应用启动了 4 个 uWSGI 进程&#xff0c;每个进程都会创建一个节点并上报状态&#xff0c;因此出现了 4 次状态上报的情况。这在大多数情况下是不合理的&#xff0c;尤其是在你只期望应用上报一次状态时。 要解决这个问题并优雅地进…

医学数据分析实训 项目十 基于深度残差神经网络的皮肤癌检测

文章目录 综合实践三 基于深度残差神经网络的皮肤癌检测实现步骤1&#xff1a;图像数据预处理实现步骤2&#xff1a;模型构建实现步骤3&#xff1a;性能度量提交要求 1 基于深度残差神经网络的皮肤癌检测代码2 结果分析 综合实践三 基于深度残差神经网络的皮肤癌检测 皮肤镜图…

什么是 SSL 代理?

您可能已经对代理有所了解&#xff0c;例如移动代理、住宅代理和数据中心代理之间的区别。但是 SSL 代理到底是什么&#xff1f;它与其他类型的代理相比有何不同&#xff1f; 让我们分析一下&#xff0c;看看 SSL 代理有何特殊之处。 1.什么是 SSL/HTTPS 代理&#xff1f; SS…

Linux 内核的版本控制

Linux 内核的版本控制以及确保模块兼容性是开发和维护 Linux 系统时非常重要的一部分。 Linux 内核采用了语义版本控制&#xff08;Semantic Versioning&#xff09;&#xff0c;通常由三个部分组成&#xff1a; 主版本号&#xff08;major&#xff09;、次版本号&#xff08;m…

c++ static(详解)

在C中&#xff0c;static关键字用于定义具有静态存储持续时间的变量或方法。它在不同上下文中有不同的含义&#xff0c;通常与变量的存储方式和作用域相关。static的主要作用是控制变量或函数的可见性和生命周期。 可以通过一个生活中的场景来形象化static的作用&#xff1a; …

计算机毕业设计 奖学金评定管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

Python 复制Excel 中的行、列、单元格

在Excel中&#xff0c;复制行、列和单元格是日常工作中经常需要进行的操作&#xff0c;它可以帮助你快速调整数据布局、复制数据模板或进行数据的批量处理。 本文将详细介绍如何使用Python将Excel中的行、列、或单元格范围复制到指定位置。 所需Python库 要使用Python操作Exc…