Implementando Delay no RabbitMQ com Oragon.RabbitMQ sem Delayed Exchange
Um assunto recorrente quando falamos de RabbitMQ é a implementação de delay.
Embora tenhamos o delayed exchange, existem questões relacionadas a cluster que precisam ser consideradas e por isso não é algo que aborde, além de ser algo instalado à parte.
Existem muitos motivos para evitarmos esse tipo de exchange e na apresentação do Clayton Cavaleiro sobre “Servindo 1MM de mensagens simultâneas com RabbitMQ” vimos alguns desses problemas.
Assim vamos abordar uma solução alternativa, usando uma fila auxiliar.
Entendendo Oragon.RabbitMQ
O projeto consiste em implementar Minimal API’s para consumir filas do RabbitMQ, e aqui temos a diferença entre um consumidor normal e a implementação com Oragon.RabbitMQ.
A criação de AMQPResults que dão suporte à essa implementação sustentou a possibilidade de um AMQPResult capaz de enviar uma mensagem para uma fila de delay.
É um modelo similar ao encontrado nas Mininal API’s, e copiei para entregar a mesma experiência de uso.
Dessa forma, o MapQueue permite que você lide na Minimal API com os tratamentos necessários, sem proliferar, propagar e assim ferir as responsabilidades da sua camada de serviço/negócio com conteúdo relacionado ao RabbitMQ.
A implementação do DelayedResult é uma implementação que não faz parte da library, faz parte do ACADEMIA PAY e está licenciado sob MIT (licença permissiva) e é uma das possibilidades de uso.
Abaixo segue o código de exemplo.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
// Licensed to LuizCarlosFaria, gaGO.io, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements.
// The ACADEMIA.DEV licenses this file to you under the MIT license.
using global::Oragon.RabbitMQ.Consumer.Actions;
using global::Oragon.RabbitMQ.Consumer;
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;
namespace AcademiaPay.Messaging;
public class DelayedResult : IAMQPResult
{
private readonly TimeSpan ttl;
/// <summary>
/// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL.
/// </summary>
/// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param>
public DelayedResult(TimeSpan ttl)
{
this.ttl = ttl;
}
public async Task ExecuteAsync(IAmqpContext context)
// Licensed to LuizCarlosFaria, gaGO.io, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements.
// The ACADEMIA.DEV licenses this file to you under the MIT license.
using global::Oragon.RabbitMQ.Consumer.Actions;
using global::Oragon.RabbitMQ.Consumer;
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;
namespace AcademiaPay.Messaging;
public class DelayedResult : IAMQPResult
{
private readonly TimeSpan ttl;
/// <summary>
/// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL.
/// </summary>
/// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param>
public DelayedResult(TimeSpan ttl)
{
this.ttl = ttl;
}
public async Task ExecuteAsync(IAmqpContext context)
{
ArgumentNullException.ThrowIfNull(context, nameof(context));
var delayedQueueName = $"{context.QueueName}.delayed";
var properties = new BasicProperties
{
// Copy all properties from old BasicProperties to new BasicProperties
AppId = context.Request.BasicProperties.AppId,
ClusterId = context.Request.BasicProperties.ClusterId,
ContentEncoding = context.Request.BasicProperties.ContentEncoding,
ContentType = context.Request.BasicProperties.ContentType,
CorrelationId = context.Request.BasicProperties.CorrelationId,
DeliveryMode = context.Request.BasicProperties.DeliveryMode,
Expiration = context.Request.BasicProperties.Expiration,
Headers = context.Request.BasicProperties.Headers,
MessageId = context.Request.BasicProperties.MessageId,
Persistent = context.Request.BasicProperties.Persistent,
Priority = context.Request.BasicProperties.Priority,
ReplyTo = context.Request.BasicProperties.ReplyTo,
Timestamp = context.Request.BasicProperties.Timestamp,
Type = context.Request.BasicProperties.Type,
UserId = context.Request.BasicProperties.UserId,
ReplyToAddress = context.Request.BasicProperties.ReplyToAddress
};
// Only TTL will be changed
properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString();
await context.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: delayedQueueName,
mandatory: true,
basicProperties: properties,
body: context.Request.Body).AsTask().ConfigureAwait(false);
await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false);
}
}
// Licensed to LuizCarlosFaria, gaGO.io, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements.
// The ACADEMIA.DEV licenses this file to you under the MIT license.
using global::Oragon.RabbitMQ.Consumer.Actions;
using global::Oragon.RabbitMQ.Consumer;
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;
namespace AcademiaPay.Messaging;
public class DelayedResult : IAMQPResult
{
private readonly TimeSpan ttl;
/// <summary>
/// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL.
/// </summary>
/// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param>
public DelayedResult(TimeSpan ttl)
{
this.ttl = ttl;
}
public async Task ExecuteAsync(IAmqpContext context)
{
ArgumentNullException.ThrowIfNull(context, nameof(context));
var delayedQueueName = $"{context.QueueName}.delayed";
var properties = new BasicProperties
{
// Copy all properties from old BasicProperties to new BasicProperties
AppId = context.Request.BasicProperties.AppId,
ClusterId = context.Request.BasicProperties.ClusterId,
ContentEncoding = context.Request.BasicProperties.ContentEncoding,
ContentType = context.Request.BasicProperties.ContentType,
CorrelationId = context.Request.BasicProperties.CorrelationId,
DeliveryMode = context.Request.BasicProperties.DeliveryMode,
Expiration = context.Request.BasicProperties.Expiration,
Headers = context.Request.BasicProperties.Headers,
MessageId = context.Request.BasicProperties.MessageId,
Persistent = context.Request.BasicProperties.Persistent,
Priority = context.Request.BasicProperties.Priority,
ReplyTo = context.Request.BasicProperties.ReplyTo,
Timestamp = context.Request.BasicProperties.Timestamp,
Type = context.Request.BasicProperties.Type,
UserId = context.Request.BasicProperties.UserId,
ReplyToAddress = context.Request.BasicProperties.ReplyToAddress
};
// Only TTL will be changed
properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString();
await context.Channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: delayedQueueName,
mandatory: true,
basicProperties: properties,
body: context.Request.Body).AsTask().ConfigureAwait(false);
await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false);
}
}
Para o mecanismo funcionar, a fila de delay precisa ter como deadletter uma rota válida para a fila de processamento.
Para esse mecanismo funcionar é necessário criar uma fila auxiliar para cada fila que possua delay.
0 comentários