Publicado em: segunda-feira, 20 de jan de 2025
Implementando Delay no RabbitMQ com Oragon.RabbitMQ sem Delayed Exchange

Um assunto recorrente quando falamos de RabbitMQ é a implementação de delay.

Embora tenhamos o delayed exchange, existem questões relacionadas a cluster que precisam ser consideradas e por isso não é algo que aborde, além de ser algo instalado à parte.

Existem muitos motivos para evitarmos esse tipo de exchange e na apresentação do Clayton Cavaleiro sobre “Servindo 1MM de mensagens simultâneas com RabbitMQ” vimos alguns desses problemas.

Assim vamos abordar uma solução alternativa, usando uma fila auxiliar.

Entendendo Oragon.RabbitMQ

O projeto consiste em implementar Minimal API’s para consumir filas do RabbitMQ, e aqui temos a diferença entre um consumidor normal e a implementação com Oragon.RabbitMQ.

A criação de AMQPResults que dão suporte à essa implementação sustentou a possibilidade de um AMQPResult capaz de enviar uma mensagem para uma fila de delay.

É um modelo similar ao encontrado nas Mininal API’s, e copiei para entregar a mesma experiência de uso.

Dessa forma, o MapQueue permite que você lide na Minimal API com os tratamentos necessários, sem proliferar, propagar e assim ferir as responsabilidades da sua camada de serviço/negócio com conteúdo relacionado ao RabbitMQ.

A implementação do DelayedResult é uma implementação que não faz parte da library, faz parte do ACADEMIA PAY e está licenciado sob MIT (licença permissiva) e é uma das possibilidades de uso.

Abaixo segue o código de exemplo.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
// Licensed to LuizCarlosFaria, gaGO.io, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements.
// The ACADEMIA.DEV licenses this file to you under the MIT license.
using global::Oragon.RabbitMQ.Consumer.Actions;
using global::Oragon.RabbitMQ.Consumer;
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;
namespace AcademiaPay.Messaging;
public class DelayedResult : IAMQPResult
{
private readonly TimeSpan ttl;
/// <summary>
/// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL.
/// </summary>
/// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param>
public DelayedResult(TimeSpan ttl)
{
this.ttl = ttl;
}
public async Task ExecuteAsync(IAmqpContext context)
{
ArgumentNullException.ThrowIfNull(context, nameof(context));
var delayedQueueName = $"{context.QueueName}.delayed";
var properties = new BasicProperties
{
// Copy all properties from old BasicProperties to new BasicProperties
AppId = context.Request.BasicProperties.AppId,
ClusterId = context.Request.BasicProperties.ClusterId,
ContentEncoding = context.Request.BasicProperties.ContentEncoding,
ContentType = context.Request.BasicProperties.ContentType,
CorrelationId = context.Request.BasicProperties.CorrelationId,
DeliveryMode = context.Request.BasicProperties.DeliveryMode,
Expiration = context.Request.BasicProperties.Expiration,
Headers = context.Request.BasicProperties.Headers,
MessageId = context.Request.BasicProperties.MessageId,
Persistent = context.Request.BasicProperties.Persistent,
Priority = context.Request.BasicProperties.Priority,
ReplyTo = context.Request.BasicProperties.ReplyTo,
Timestamp = context.Request.BasicProperties.Timestamp,
Type = context.Request.BasicProperties.Type,
UserId = context.Request.BasicProperties.UserId,
ReplyToAddress = context.Request.BasicProperties.ReplyToAddress
};
// Only TTL will be changed
properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString();
await context.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: delayedQueueName,
mandatory: true,
basicProperties: properties,
body: context.Request.Body).AsTask().ConfigureAwait(false);
await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false);
}
}
// Licensed to LuizCarlosFaria, gaGO.io, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements. // The ACADEMIA.DEV licenses this file to you under the MIT license. using global::Oragon.RabbitMQ.Consumer.Actions; using global::Oragon.RabbitMQ.Consumer; using RabbitMQ.Client; using System.Text; using System.Threading.Tasks; namespace AcademiaPay.Messaging; public class DelayedResult : IAMQPResult { private readonly TimeSpan ttl; /// <summary> /// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL. /// </summary> /// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param> public DelayedResult(TimeSpan ttl) { this.ttl = ttl; } public async Task ExecuteAsync(IAmqpContext context) { ArgumentNullException.ThrowIfNull(context, nameof(context)); var delayedQueueName = $"{context.QueueName}.delayed"; var properties = new BasicProperties { // Copy all properties from old BasicProperties to new BasicProperties AppId = context.Request.BasicProperties.AppId, ClusterId = context.Request.BasicProperties.ClusterId, ContentEncoding = context.Request.BasicProperties.ContentEncoding, ContentType = context.Request.BasicProperties.ContentType, CorrelationId = context.Request.BasicProperties.CorrelationId, DeliveryMode = context.Request.BasicProperties.DeliveryMode, Expiration = context.Request.BasicProperties.Expiration, Headers = context.Request.BasicProperties.Headers, MessageId = context.Request.BasicProperties.MessageId, Persistent = context.Request.BasicProperties.Persistent, Priority = context.Request.BasicProperties.Priority, ReplyTo = context.Request.BasicProperties.ReplyTo, Timestamp = context.Request.BasicProperties.Timestamp, Type = context.Request.BasicProperties.Type, UserId = context.Request.BasicProperties.UserId, ReplyToAddress = context.Request.BasicProperties.ReplyToAddress }; // Only TTL will be changed properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString(); await context.Channel.BasicPublishAsync( exchange: string.Empty, routingKey: delayedQueueName, mandatory: true, basicProperties: properties, body: context.Request.Body).AsTask().ConfigureAwait(false); await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false); } }
// Licensed to LuizCarlosFaria, gaGO.io, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements.
// The ACADEMIA.DEV licenses this file to you under the MIT license.

using global::Oragon.RabbitMQ.Consumer.Actions;
using global::Oragon.RabbitMQ.Consumer;
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;

namespace AcademiaPay.Messaging;

public class DelayedResult : IAMQPResult
{
    private readonly TimeSpan ttl;

    /// <summary>
    /// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL.
    /// </summary>
    /// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param>
    public DelayedResult(TimeSpan ttl)
    {
        this.ttl = ttl;
    }

    public async Task ExecuteAsync(IAmqpContext context)
    {
        ArgumentNullException.ThrowIfNull(context, nameof(context));

        var delayedQueueName = $"{context.QueueName}.delayed";

        var properties = new BasicProperties
        {
            // Copy all properties from old BasicProperties to new BasicProperties
            AppId = context.Request.BasicProperties.AppId,
            ClusterId = context.Request.BasicProperties.ClusterId,
            ContentEncoding = context.Request.BasicProperties.ContentEncoding,
            ContentType = context.Request.BasicProperties.ContentType,
            CorrelationId = context.Request.BasicProperties.CorrelationId,
            DeliveryMode = context.Request.BasicProperties.DeliveryMode,
            Expiration = context.Request.BasicProperties.Expiration,
            Headers = context.Request.BasicProperties.Headers,
            MessageId = context.Request.BasicProperties.MessageId,
            Persistent = context.Request.BasicProperties.Persistent,
            Priority = context.Request.BasicProperties.Priority,
            ReplyTo = context.Request.BasicProperties.ReplyTo,
            Timestamp = context.Request.BasicProperties.Timestamp,
            Type = context.Request.BasicProperties.Type,
            UserId = context.Request.BasicProperties.UserId,
            ReplyToAddress = context.Request.BasicProperties.ReplyToAddress
        };

        // Only TTL will be changed
        properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString();

        await context.Channel.BasicPublishAsync(
            exchange: string.Empty,
            routingKey: delayedQueueName,
            mandatory: true,
            basicProperties: properties,
            body: context.Request.Body).AsTask().ConfigureAwait(false);

        await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false);
    }
}

Aproveite…

0 comentários

Enviar um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Este site utiliza o Akismet para reduzir spam. Saiba como seus dados em comentários são processados.

Lives

Fique de olho nas lives

Fique de olho nas lives no meu canal do Youtube, no Canal .NET e nos Grupos do Facebook e Instagram.

Aceleradores

Existem diversas formas de viabilizar o suporte ao teu projeto. Seja com os treinamentos, consultoria, mentorias em grupo.