Em Julho o RabbitMQ ganhou suporte a Streams. De um lado aproxima o RabbitMQ do Kafka já que possui a mesma estrutura de dados, mas por outra ótica se isola completamente sendo provavelmente o único produto da categoria com suporte a ambos. Mas você sabia que a diferença entre Streams e Queues são enormes? Ao ponto da discussão RabbitMQ vs Kafka sequer fazer muito sentido?
Hoje entrego mais um projeto de exemplo para os alunos do curso de RabbitMQ, e dessa vez é uma demo 2 em 1. Além de apresentar RabbitMQ com NodeJS, aproveitei para entregar o suporte a Streams e demonstrar as demandas específicas para lidar com o consumo dessa estrutura, visando mostrar uma forma uniforme de trabalhar com queues e streams.
A API que lida com streams no RabbitMQ é totalmente compatível com o AMQP. Isso faz com que providers velhos consigam lidar com Streams sem necessidade de reescrita de código.
É claro que esse não é o modelo mais performático e eficiente, um modelo enxuto deve ser criado e a galera do RabbitMQ que já criou um projeto para o Cliente .NET desde os primeiros dias, embora esteja vazio.
O fato é que temos a oportunidade de trabalhar com Streams nos providers AMQP, mas as streams se comportam diferente das filas, não possuem diversos controles que as filas possuem, e por isso AMQP não é o formato mais adequado para esse tipo de trabalho.
Na prática estamos falando de remover features, fundamentais para o AMQP e para o conceito de Filas, mas que são desnecessários para as streams.
Diferenças
Distribuição entre consumidores
Com filas, cada cliente recebe uma mensagem diferente. Uma mesma mensagem nunca é distribuída para mais de um consumidor de uma única fila. Se queremos que diversos consumidores recebam uma mesma mensagem, precisamos criar tantas filas quanto o número de consumidores, fazendo com que cada um tenha a sua fila.
Já com streams, todos os consumidores de uma única stream, recebem todas as mensagens da stream.
Ack
Com filas, o ack é usado para sinalizar para o RabbitMQ que a mensagem foi processada e pode ser deletada da fila. Entre AutoAck e Ack Manual há uma diferença no momento em que o ack ocorre, mas inevitavelmente é ele quem faz o trabalho sujo de confirmar o processamento. Isso produz a exclusão da mensagem da fila.
Com streams, ack é desnecessário, visto que a natureza não destrutiva dos dados da stream faz com que seja irrelevante a presença do Ack.
Reject e NoAck
Esses sequer existem no consumo de Streams.
AMQP é ineficiente para Streams
Esses controles que citei acima fazem toda diferença. São eles que asseguram que o RabbitMQ não seja apenas uma estrutura filas, mas seja um message broker. Essas características são fundamentais para o trabalho com Filas, mas irrelevante na manipulação de streams, e esse tipo de operação não é otimizado para quem não precisa disso.
Sobre o projeto
Eu havia começado essa jornada na tentativa de usar o nestjs, mas novamente vemos uma implementação tendenciosa e ruim ao lidar com RabbitMQ. Ela deixa a desejar no quesito exchanges, criando uma única exchange e associando a uma fila genérica que recebe dados de diversas naturezas, deixando para a aplicação next, fazer o roteamento.
Minha segunda alternativa foi pensar em um desenho mais manual, e essa foi a solução que ficou melhor em termos de design.
import * as Amqp from "amqp-ts"; import Bootstraper from "./Infrastructure/Bootstraper"; import { QueueConsumerAdapter } from "./QueueConsumerAdapter"; import { StreamConsumerAdapter } from "./StreamConsumerAdapter"; // docker run -e -p 15672:15672 -p 5672:5672 rabbitmq:3.9.5-management var connection = new Amqp.Connection("amqp://localhost:5672"); const bootstrapService = new Bootstraper(connection); bootstrapService.createEventStreamObjects(); //------------------------------------------------------------------------- const queue = bootstrapService.createEventSubscription( "adapter|welcome-email", // nome da fila "user.created" // routing key ); const queueAdapter = new QueueConsumerAdapter(queue); queueAdapter.start(); //------------------------------------------------------------------------- //------------------------------------------------------------------------- // nome da stream const stream = bootstrapService.getQueue("event-stream"); stream.prefetch(5) // offset consumer-tag const streamAdapter = new StreamConsumerAdapter(stream, "first", "gago2"); streamAdapter.start() //-------------------------------------------------------------------------
Assim que temos um consumo de filas e streams lado-a-lado.
ConsumerBase<T>
import * as Amqp from "amqp-ts"; export default abstract class ConsumerBase<T> { constructor( private queue: Amqp.Queue, private options: Amqp.Queue.ActivateConsumerOptions, ) { } start(): Promise<Amqp.Queue.StartConsumerResult> { //Choreography return this.queue.activateConsumer((message) => { try { const typedMessage: T = message.getContent(); if (this.validate(typedMessage)) { this.do(typedMessage); this.onSuccess(message); } else { console.log('Invalid', typedMessage); this.onInvalid(message, typedMessage); } } catch (e) { console.log(e); this.onException(message, e) } }, this.options); } stop() { this.queue.stopConsumer(); } abstract onException(msg: Amqp.Message, e: Error): any; abstract onSuccess(msg: Amqp.Message): any; abstract onInvalid(msg: Amqp.Message, typedMessage: T): any; abstract validate(message: T): boolean; abstract do(message: T): void; }
O papel dessa classe é trazer uma coreografia, com o fluxo seguro e resiliente, mas sem implementá-lo.
StreamConsumerBase<T>
import * as Amqp from "amqp-ts"; import ConsumerBase from "./ConsumerBase"; export default abstract class StreamConsumerBase<T> extends ConsumerBase<T> { constructor(queue: Amqp.Queue, offset: any, consumerTag?: string) { super(queue, { arguments: { "x-stream-offset": offset ?? "next", }, consumerTag: consumerTag }); } onException(msg: Amqp.Message, e: Error): any { msg.ack(); } onSuccess(msg: Amqp.Message): any { msg.ack(); } onInvalid(msg: Amqp.Message, typedMessage: T): any { msg.ack(); } }
Esse é o segundo nível de herança, aqui diferenciamos os comportamentos de Streams e Queues. Essa é a classe que lida com Streams, a versão que lida com Queues eu não coloquei aqui no post.
StreamConsumerAdapter
import * as Amqp from "amqp-ts"; import StreamConsumerBase from "./Infrastructure/MessageQueuing/StreamConsumerBase"; type MessageFormat = { userid: string } export class StreamConsumerAdapter extends StreamConsumerBase<MessageFormat> { constructor(queue: Amqp.Queue, offset: any, consumerTag?: string) { super(queue, offset, consumerTag); } validate(message: MessageFormat): boolean { return !!message.userid; } do(message: MessageFormat): void { console.log("Message received on stream: "); console.log(message); } }
Por fim temos a implementação de negócio, e no nosso caso aqui, para essa demonstração eu só printei um log.
Conclusão
Com um pouco de design e pouco código conseguimos lidar com segurança dos mais variados cenários, garantindo resiliência no fluxo de Queues, mas sem perder o jeito de trabalhar, para lidar com streams.
Trazemos o melhor dos 2 mundos em um desenho bem fácil de ser evoluído e estendido.
PS: sobre o repositório, esse é material do curso, por isso não está público.
0 comentários