Na noite do dia 25 de Dezembro lancçamos a versão 1 do Oragon.RabbitMQ, uma implementação de Minimal Api’s para o consumo de filas do RabbitMQ.
Com ele temos a mesma experiência de minimal api’s para o consumo de filas.
Contexto
RabbitMQ é um projeto incrível, seu client dá um suporte de primeiro nível para o .NET, entretanto exige uma série de configurações e preocupações.
Prefetch
Por exemplo, me recordo de ver um “Arquiteto NodeJS” dizendo no LinkedIn que só 1 instância de consumidor poderia consumir uma fila, ele dizia que ao subir a segunda instância, esse não receberia mensagens.
Isso não poderia estar mais errado, o que faltava na experiência dele é conhecer a configuração de prefetch, que por ignorância (desconhecimento), não conhecia, tão pouco conhecia seu impacto.
Mas esse não é um conhecimento do dia 1 desenvolvendo com RabbitMQ, a maioria daqueles que dão dos primeiros passos, nunca pensam em múltiplas instâncias, e também não chegam a se aprofundar na documentação para chegar a entender como funciona o mecanismo de prefetch, tão pouco como lidar com ele de forma eficiente.
Operar o prefetch exige interagir com o Channel (que se chamava Model até a versão 6 do provider), mais especificamente usando o BasicQoS, um método do AMQP que a maioria despreza, mas se frustra ao subir um segundo consumer.
Ack Automático
Para muitos é intuitivo pensar que o uso de “Ack automático” seja uma boa ideia, já que sabemos que Ack é uma confirmação, e sabemos que automático denota que não precisamos nos preocupar.
Ledo engano, AutoAck opera realizando Ack ao entregar a mensagem para o cliente com sucesso, confirmando e apagando a mensagem no Message Broker, assim que a mensagem é entregue ao user code.
Isso quer dizer que erros durante a execução produzem perda de mensagem, pois essa mensagem só existe na memória do processo consumidor. Obviamente, panes elétricas, queima de hardware também ocasionarão perda da mensagem.
Isso se traduz em jogar fora toda (ou quase toda) a resiliência que o message broker pode oferecer. Se sua mensagem leva 5 millisegundos, ou 25 minutos para ser processada, pouco importa com autoack, antes do seu código tocar a mensagem, ela já foi deletada do Message Broker.
Ack manual
Ack Automático vs Ack Manual tem 1 linha de código de diferença, mas só pelo fato dela assumirmos o controle, parece estamos fazendo a coisa errada. Seja por insegurança, seja por ignorância ou mesmo por vício. O “automático” sempre parece uma melhor opção a mais inteligente, e até uma solução que não demanda justificativas.
Essa é uma percepção muito errada, porém comum.
Cardinalidade entre models e consumers
Outro ponto de confusão é o uso de Connections vs Channels (antigamente Models) e a necessidade de um Channel exclusivo por consumidor.
Por mais que publicar exporadicamente uma mensagem, possa ser considerado uma tarefa boba, simples, usar o mesmo channel de um consumer que já possui um consumer, embora funcione sem exceptions, não é nem certo, e nem deveria ser uma opção. No passado a documentação era mais taxativa quanto a isso, hoje ela sugere, quase que com medo de tocar no assunto.
Reject ou Nack
Outra dúvida comum é sobre o uso de Reject ou Nack para negar o processamento. Essa é a típica configuração que gera tanta confusão quanto o uso de CMD vs Entrypoint no Docker.
No final das contas Nack ou Reject representam negação de processamento. Uma por rejeição, outra por incapacidade (momentanea ou não). Elas alimentam métricas diferentes, mas possuem a mesma função e comportamento.
São muitos detalhes
A maioria das abstrações que existem hoje empenham absolutoi esforço em permitir multiplos mecanismos de filas, criando uma indireção, permitindo que um mesmo código consuma RabbitMQ, Azure Service Bus, SQS e Redis.
Quando se cria abstrações poliglotas dessa forma, “o vocabulário” da abstração é reduzido e simplificado para o que é comum à todas as linguas. Em termos práticos, as abstrações suprimem comportamentos e configurações peculiares a cada mecanismo, oferecendo uma camada com o básico.
Esse básico muitas das vezes remove capacidades especiais, adiciona comportamentos duplicados e inseguros, como é o caso do MassTransit quando opta por criar manualmente fila de dead-letter e implementar deadletter manualmente, sem usar a infra do RabbitMQ para lidar com isso. Note, esse “problema” é falta de configuração, entretanto nunca vi um projeto usando Masstransit tocando nessas configurações.
A necessidade de fugir/delargar da responsabilidade, faz com que erros graves assim aconteçam.
Resultado: Projetos depositam confiança no RabbitMQ mas seus desenvolvedores sabotam toda a resiliência com decisões ruins no código.
De onde tirei isso?
Meu primeiro uso do RabbitMQ foi em 2013, desde então nunca mais parei, são 11 anos. Foram vários projetos implantando RabbitMQ. Desde então tenho falado sobre RabbitMQ consistentemente todos os anos, todos os meses. Além das milhares de horas de suporte à comunidade no telegram, tenho meu curso de RabbitMQ para desenvolvedores .NET, que já conta com aproximadamente 470 pessoas e algumas dezenas de horas de conteúdo sobre RabbitMQ e Mensageria no Youtube.
Oragon.RabbitMQ
O Oragon.RabbitMQ se trata de uma implementação opinativa de Minimal API para o RabbitMQ.
A implementação possui uma série de facilitadores para que configurar as coisas mais complexas se torne absolutamente simples e acessível.
Por exemplo:
/* ... */ app.MapQueue("events-managed", ([FromServices] EmailService svc, DoSomethingCommand cmd) => svc.DoSomethingAsync(cmd).ConfigureAwait(false)); /* ... */
/* ... */ builder.Services.AddKeyedTransient<IEmailService, XptoAAAEmailService>("email-service-1"); builder.Services.AddKeyedTransient<IEmailService, XptoBBBEmailService>("email-service-2"); /* ... */ app.MapQueue("events-managed", ([FromServices("email-service-2")] IEmailService svc, DoSomethingCommand cmd) => svc.DoSomethingAsync(cmd).ConfigureAwait(false)); /* ... */
/* ... */ app.MapQueue("events-managed", ([FromServices] EmailService svc, DoSomethingCommand cmd) => svc.DoSomethingAsync(cmd).ConfigureAwait(false)) .WithPrefetch(2000); /* ... */
/* ... */ app.MapQueue("events-managed", ([FromServices] EmailService svc, DoSomethingCommand cmd) => svc.DoSomethingAsync(cmd).ConfigureAwait(false)) .WithPrefetch(2000) .WithDispatchConcurrency(2); /* ... */
/* ... */ app.MapQueue("events-managed", ([FromServices] EmailService svc, DoSomethingCommand cmd) => { IAMQPResult returnValue; if (svc.CanXpto(msg)) { await svc.DoXptoAsync(msg); returnValue = new AckResult(); } else { returnValue = new RejectResult(requeue: true); } return returnValue; }) .WithPrefetch(2000) .WithDispatchConcurrency(2); /* ... */
/* ... */ app.MapQueue("events-managed", (DoSomethingCommand cmd) => { IAMQPResult returnValue; if (staticClass.CanXpto(msg)) { await staticClass.DoXptoAsync(msg); returnValue = new AckResult(); } else { returnValue = new RejectResult(requeue: true); } return returnValue; }) .WithPrefetch(2000) .WithDispatchConcurrency(2); /* ... */
/* ... */ app.MapQueue("events-managed", staticClass.DoXptoAsync) .WithPrefetch(2000) .WithDispatchConcurrency(2); /* ... */
/* ... */ app.MapQueue("events-managed", staticClass.DoXpto) .WithPrefetch(2000) .WithDispatchConcurrency(2); /* ... */
Fluxo Automático
Para todo retorno não for dos tipos IAMQPResult ou Task<IAMQPResult>, o fluxo automático entra em ação.
Em caso de sucesso (auxência de exception), o fluxo automático executará um BasicAck.
Em caso de insucesso, quando uma exception é lançada), o fluxo automático executará um BasicNack com Requeue = false.
Isso nos obriga a configurar deadletter para a fila, para evitar perda de dados.
Fluxo Manual
Para todo retorno dos tipos IAMQPResult ou Task<IAMQPResult>, o fluxo está sobre controle da instância do IAMQPResult retornado.
O IAMQPResult pode ser:
- AckResult – Realiza BasicAck confirmando o processamento com sucesso.
- NackResult – Realiza o BasicNack negando o processamento.
- RejectResult – Realiza o BasicReject negando o processamento.
- ReplyResult – Experimental – No construtor recebe um objeto para ser publicado como resposta. Como endereço de resposta usa-se como Routing Key o valor de ReplyTo da mensagem recebida, usando a exchange direct para realizar esse envio. Portanto nessa implementação espera-se uma resposta no mesmo Virtual Host, e que no ReplyTo tenhamos o nome de uma fila.
- ComposableResult – Usado para compor multiplas operações, como Ack + Reply, Nack + Reply, Reject + Reply.
NackResult e RejectResult possuem o argumento bool requeue
no construtor, para determinar se a mensagem deve ou não ser reenfileirada.
Serialização e Reject
Temos 2 serializadores implementados, tanto com System.Text.Json e com Newtonsoft.Json.
O fluxo hoje rejeita com requeue=false toda vez que há um erro na desserialização.
Ainda estou estudando a possibilidade de tornar opcional a desserialização, pensando que possa ser opcionalmente delegada para o user code.
Cuidados com Performance
Todos os cuidados foram tomados para evitar processamento durante o consumo de cada mensagem, e para isso medidas foram tomadas para antecipar decisões, principalmente as que envolvem o model binding e dispatching e tratadores de resposta, assim toda introspeção reside no startup do projeto, sendo executada uma única vez, permitindo que o fluxo de consumo de mensagens não tenha nenhuma introspecção adicional.
Comparativo
No exemplo com .NET Aspire, fiz questão de criar uma estrutura com uma exchange fanout que duplicasse cada mensagens em 2 filas. Com essa abordagem a quantidade de mensagens que chegaria a cada fila seria exatamente a mesma, e portanto conseguiria comparar o desempenho entre diversos consumidores.
Adotei um consumidor que chamei de gerenciado, usando a infraestrutura do Oragon.RabbitMQ e outro usando um consumidor nativo do RabbitMQ, adotando as mesmas práticas, porém sem nenhuma camada de abstração.
Embora fosse provável que o consumidor gerenciado trouxesse perdas, não é possível constatar tais perdas. Por enquanto.
Ainda estou sem parte da minha infraestrutura (meu desktop em especial), em breve vou fazer um teste em uma infraestrutura mais robusta afim de chegar
Suporte antecipado ao RabbitMQ.Client 7 no .NET Aspire
Para quem está usando .NET Aspire, a biblioteca cliente Aspire.RabbitMQ.Client ainda não dá suporte ao RabbitMQ.Client 7.0, portanto foi necessário criar uma biblioteca de transição temporária Oragon.RabbitMQ.AspireClient para endereçar essa necessidade.
Na versão 7.x do RabbitMQ.Client muita coisa mudou e todas as versões anteriores são incompatíveis com essa nova versão, portanto utilizar o client padrão do .NET Aspire trará consigo o RabbitMQ.Cliente 6.x que não funcionará.
Versões do .NET
Todas as bibliotecas dão suporte a .NET 8 e .NET 9.
Ainda estou estudando a possibilidade de suporte a outras versões do .NET.
Enjoy
Aproveite, use, e dê feedbacks!
O projeto está no GitHub em github.com/luizcarlosfaria/Oragon.RabbitMQ/
0 comentários