Publicado em: segunda-feira, 20 de jan de 2025
Implementando Delay no RabbitMQ com Oragon.RabbitMQ sem Delayed Exchange

Um assunto recorrente quando falamos de RabbitMQ é a implementação de delay.

Embora tenhamos o delayed exchange, existem questões relacionadas a cluster que precisam ser consideradas e por isso não é algo que aborde, além de ser algo instalado à parte.

Existem muitos motivos para evitarmos esse tipo de exchange e na apresentação do Clayton Cavaleiro sobre “Servindo 1MM de mensagens simultâneas com RabbitMQ” vimos alguns desses problemas.

Assim vamos abordar uma solução alternativa, usando uma fila auxiliar.

Entendendo Oragon.RabbitMQ

O projeto consiste em implementar Minimal API’s para consumir filas do RabbitMQ, e aqui temos a diferença entre um consumidor normal e a implementação com Oragon.RabbitMQ.

A criação de AMQPResults que dão suporte à essa implementação sustentou a possibilidade de um AMQPResult capaz de enviar uma mensagem para uma fila de delay.

É um modelo similar ao encontrado nas Mininal API’s, e copiei para entregar a mesma experiência de uso.

Dessa forma, o MapQueue permite que você lide na Minimal API com os tratamentos necessários, sem proliferar, propagar e assim ferir as responsabilidades da sua camada de serviço/negócio com conteúdo relacionado ao RabbitMQ.

A implementação do DelayedResult é uma implementação que não faz parte da library, faz parte do ACADEMIA PAY e está licenciado sob MIT (licença permissiva) e é uma das possibilidades de uso.

Abaixo segue o código de exemplo.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
// Licensed to LuizCarlosFaria,, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements.
// The ACADEMIA.DEV licenses this file to you under the MIT license.
using global::Oragon.RabbitMQ.Consumer.Actions;
using global::Oragon.RabbitMQ.Consumer;
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;
namespace AcademiaPay.Messaging;
public class DelayedResult : IAMQPResult
private readonly TimeSpan ttl;
/// <summary>
/// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL.
/// </summary>
/// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param>
public DelayedResult(TimeSpan ttl)
this.ttl = ttl;
public async Task ExecuteAsync(IAmqpContext context)
ArgumentNullException.ThrowIfNull(context, nameof(context));
var delayedQueueName = $"{context.QueueName}.delayed";
var properties = new BasicProperties
// Copy all properties from old BasicProperties to new BasicProperties
AppId = context.Request.BasicProperties.AppId,
ClusterId = context.Request.BasicProperties.ClusterId,
ContentEncoding = context.Request.BasicProperties.ContentEncoding,
ContentType = context.Request.BasicProperties.ContentType,
CorrelationId = context.Request.BasicProperties.CorrelationId,
DeliveryMode = context.Request.BasicProperties.DeliveryMode,
Expiration = context.Request.BasicProperties.Expiration,
Headers = context.Request.BasicProperties.Headers,
MessageId = context.Request.BasicProperties.MessageId,
Persistent = context.Request.BasicProperties.Persistent,
Priority = context.Request.BasicProperties.Priority,
ReplyTo = context.Request.BasicProperties.ReplyTo,
Timestamp = context.Request.BasicProperties.Timestamp,
Type = context.Request.BasicProperties.Type,
UserId = context.Request.BasicProperties.UserId,
ReplyToAddress = context.Request.BasicProperties.ReplyToAddress
// Only TTL will be changed
properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString();
await context.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: delayedQueueName,
mandatory: true,
basicProperties: properties,
body: context.Request.Body).AsTask().ConfigureAwait(false);
await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false);
// Licensed to LuizCarlosFaria,, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements. // The ACADEMIA.DEV licenses this file to you under the MIT license. using global::Oragon.RabbitMQ.Consumer.Actions; using global::Oragon.RabbitMQ.Consumer; using RabbitMQ.Client; using System.Text; using System.Threading.Tasks; namespace AcademiaPay.Messaging; public class DelayedResult : IAMQPResult { private readonly TimeSpan ttl; /// <summary> /// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL. /// </summary> /// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param> public DelayedResult(TimeSpan ttl) { this.ttl = ttl; } public async Task ExecuteAsync(IAmqpContext context) { ArgumentNullException.ThrowIfNull(context, nameof(context)); var delayedQueueName = $"{context.QueueName}.delayed"; var properties = new BasicProperties { // Copy all properties from old BasicProperties to new BasicProperties AppId = context.Request.BasicProperties.AppId, ClusterId = context.Request.BasicProperties.ClusterId, ContentEncoding = context.Request.BasicProperties.ContentEncoding, ContentType = context.Request.BasicProperties.ContentType, CorrelationId = context.Request.BasicProperties.CorrelationId, DeliveryMode = context.Request.BasicProperties.DeliveryMode, Expiration = context.Request.BasicProperties.Expiration, Headers = context.Request.BasicProperties.Headers, MessageId = context.Request.BasicProperties.MessageId, Persistent = context.Request.BasicProperties.Persistent, Priority = context.Request.BasicProperties.Priority, ReplyTo = context.Request.BasicProperties.ReplyTo, Timestamp = context.Request.BasicProperties.Timestamp, Type = context.Request.BasicProperties.Type, UserId = context.Request.BasicProperties.UserId, ReplyToAddress = context.Request.BasicProperties.ReplyToAddress }; // Only TTL will be changed properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString(); await context.Channel.BasicPublishAsync( exchange: string.Empty, routingKey: delayedQueueName, mandatory: true, basicProperties: properties, body: context.Request.Body).AsTask().ConfigureAwait(false); await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false); } }
// Licensed to LuizCarlosFaria,, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements.
// The ACADEMIA.DEV licenses this file to you under the MIT license.

using global::Oragon.RabbitMQ.Consumer.Actions;
using global::Oragon.RabbitMQ.Consumer;
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;

namespace AcademiaPay.Messaging;

public class DelayedResult : IAMQPResult
    private readonly TimeSpan ttl;

    /// <summary>
    /// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL.
    /// </summary>
    /// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param>
    public DelayedResult(TimeSpan ttl)
        this.ttl = ttl;

    public async Task ExecuteAsync(IAmqpContext context)
        ArgumentNullException.ThrowIfNull(context, nameof(context));

        var delayedQueueName = $"{context.QueueName}.delayed";

        var properties = new BasicProperties
            // Copy all properties from old BasicProperties to new BasicProperties
            AppId = context.Request.BasicProperties.AppId,
            ClusterId = context.Request.BasicProperties.ClusterId,
            ContentEncoding = context.Request.BasicProperties.ContentEncoding,
            ContentType = context.Request.BasicProperties.ContentType,
            CorrelationId = context.Request.BasicProperties.CorrelationId,
            DeliveryMode = context.Request.BasicProperties.DeliveryMode,
            Expiration = context.Request.BasicProperties.Expiration,
            Headers = context.Request.BasicProperties.Headers,
            MessageId = context.Request.BasicProperties.MessageId,
            Persistent = context.Request.BasicProperties.Persistent,
            Priority = context.Request.BasicProperties.Priority,
            ReplyTo = context.Request.BasicProperties.ReplyTo,
            Timestamp = context.Request.BasicProperties.Timestamp,
            Type = context.Request.BasicProperties.Type,
            UserId = context.Request.BasicProperties.UserId,
            ReplyToAddress = context.Request.BasicProperties.ReplyToAddress

        // Only TTL will be changed
        properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString();

        await context.Channel.BasicPublishAsync(
            exchange: string.Empty,
            routingKey: delayedQueueName,
            mandatory: true,
            basicProperties: properties,
            body: context.Request.Body).AsTask().ConfigureAwait(false);

        await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false);


0 comentários

Enviar um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Este site utiliza o Akismet para reduzir spam. Saiba como seus dados em comentários são processados.


Fique de olho nas lives

Fique de olho nas lives no meu canal do Youtube, no Canal .NET e nos Grupos do Facebook e Instagram.


Existem diversas formas de viabilizar o suporte ao teu projeto. Seja com os treinamentos, consultoria, mentorias em grupo.