Em Julho o RabbitMQ ganhou suporte a Streams. De um lado aproxima o RabbitMQ do Kafka já que possui a mesma estrutura de dados, mas por outra ótica se isola completamente sendo provavelmente o único produto da categoria com suporte a ambos. Mas você sabia que a diferença entre Streams e Queues são enormes? Ao ponto da discussão RabbitMQ vs Kafka sequer fazer muito sentido?
Hoje entrego mais um projeto de exemplo para os alunos do curso de RabbitMQ, e dessa vez é uma demo 2 em 1. Além de apresentar RabbitMQ com NodeJS, aproveitei para entregar o suporte a Streams e demonstrar as demandas específicas para lidar com o consumo dessa estrutura, visando mostrar uma forma uniforme de trabalhar com queues e streams.
A API que lida com streams no RabbitMQ é totalmente compatível com o AMQP. Isso faz com que providers velhos consigam lidar com Streams sem necessidade de reescrita de código.
É claro que esse não é o modelo mais performático e eficiente, um modelo enxuto deve ser criado e a galera do RabbitMQ que já criou um projeto para o Cliente .NET desde os primeiros dias, embora esteja vazio.
O fato é que temos a oportunidade de trabalhar com Streams nos providers AMQP, mas as streams se comportam diferente das filas, não possuem diversos controles que as filas possuem, e por isso AMQP não é o formato mais adequado para esse tipo de trabalho.
Na prática estamos falando de remover features, fundamentais para o AMQP e para o conceito de Filas, mas que são desnecessários para as streams.
Diferenças
Distribuição entre consumidores
Com filas, cada cliente recebe uma mensagem diferente. Uma mesma mensagem nunca é distribuída para mais de um consumidor de uma única fila. Se queremos que diversos consumidores recebam uma mesma mensagem, precisamos criar tantas filas quanto o número de consumidores, fazendo com que cada um tenha a sua fila.
Já com streams, todos os consumidores de uma única stream, recebem todas as mensagens da stream.
Ack
Com filas, o ack é usado para sinalizar para o RabbitMQ que a mensagem foi processada e pode ser deletada da fila. Entre AutoAck e Ack Manual há uma diferença no momento em que o ack ocorre, mas inevitavelmente é ele quem faz o trabalho sujo de confirmar o processamento. Isso produz a exclusão da mensagem da fila.
Com streams, ack é desnecessário, visto que a natureza não destrutiva dos dados da stream faz com que seja irrelevante a presença do Ack.
Reject e NoAck
Esses sequer existem no consumo de Streams.
AMQP é ineficiente para Streams
Esses controles que citei acima fazem toda diferença. São eles que asseguram que o RabbitMQ não seja apenas uma estrutura filas, mas seja um message broker. Essas características são fundamentais para o trabalho com Filas, mas irrelevante na manipulação de streams, e esse tipo de operação não é otimizado para quem não precisa disso.
Sobre o projeto
Eu havia começado essa jornada na tentativa de usar o nestjs, mas novamente vemos uma implementação tendenciosa e ruim ao lidar com RabbitMQ. Ela deixa a desejar no quesito exchanges, criando uma única exchange e associando a uma fila genérica que recebe dados de diversas naturezas, deixando para a aplicação next, fazer o roteamento.
Minha segunda alternativa foi pensar em um desenho mais manual, e essa foi a solução que ficou melhor em termos de design.
import * as Amqp from "amqp-ts";
import Bootstraper from "./Infrastructure/Bootstraper";
import { QueueConsumerAdapter } from "./QueueConsumerAdapter";
import { StreamConsumerAdapter } from "./StreamConsumerAdapter";
// docker run -e -p 15672:15672 -p 5672:5672 rabbitmq:3.9.5-management
var connection = new Amqp.Connection("amqp://localhost:5672");
const bootstrapService = new Bootstraper(connection);
bootstrapService.createEventStreamObjects();
//-------------------------------------------------------------------------
const queue =
bootstrapService.createEventSubscription(
"adapter|welcome-email", // nome da fila
"user.created" // routing key
);
const queueAdapter = new QueueConsumerAdapter(queue);
queueAdapter.start();
//-------------------------------------------------------------------------
//-------------------------------------------------------------------------
// nome da stream
const stream = bootstrapService.getQueue("event-stream");
stream.prefetch(5)
// offset consumer-tag
const streamAdapter
= new StreamConsumerAdapter(stream, "first", "gago2");
streamAdapter.start()
//-------------------------------------------------------------------------
Assim que temos um consumo de filas e streams lado-a-lado.
ConsumerBase<T>
import * as Amqp from "amqp-ts";
export default abstract class ConsumerBase<T> {
constructor(
private queue: Amqp.Queue,
private options: Amqp.Queue.ActivateConsumerOptions,
) { }
start(): Promise<Amqp.Queue.StartConsumerResult> {
//Choreography
return this.queue.activateConsumer((message) => {
try {
const typedMessage: T = message.getContent();
if (this.validate(typedMessage)) {
this.do(typedMessage);
this.onSuccess(message);
} else {
console.log('Invalid', typedMessage);
this.onInvalid(message, typedMessage);
}
} catch (e) {
console.log(e);
this.onException(message, e)
}
}, this.options);
}
stop() {
this.queue.stopConsumer();
}
abstract onException(msg: Amqp.Message, e: Error): any;
abstract onSuccess(msg: Amqp.Message): any;
abstract onInvalid(msg: Amqp.Message, typedMessage: T): any;
abstract validate(message: T): boolean;
abstract do(message: T): void;
}
O papel dessa classe é trazer uma coreografia, com o fluxo seguro e resiliente, mas sem implementá-lo.
StreamConsumerBase<T>
import * as Amqp from "amqp-ts";
import ConsumerBase from "./ConsumerBase";
export default abstract class StreamConsumerBase<T> extends ConsumerBase<T> {
constructor(queue: Amqp.Queue, offset: any, consumerTag?: string) {
super(queue, {
arguments: {
"x-stream-offset": offset ?? "next",
},
consumerTag: consumerTag
});
}
onException(msg: Amqp.Message, e: Error): any { msg.ack(); }
onSuccess(msg: Amqp.Message): any { msg.ack(); }
onInvalid(msg: Amqp.Message, typedMessage: T): any { msg.ack(); }
}
Esse é o segundo nível de herança, aqui diferenciamos os comportamentos de Streams e Queues. Essa é a classe que lida com Streams, a versão que lida com Queues eu não coloquei aqui no post.
StreamConsumerAdapter
import * as Amqp from "amqp-ts";
import StreamConsumerBase from "./Infrastructure/MessageQueuing/StreamConsumerBase";
type MessageFormat = {
userid: string
}
export class StreamConsumerAdapter extends StreamConsumerBase<MessageFormat> {
constructor(queue: Amqp.Queue, offset: any, consumerTag?: string) {
super(queue, offset, consumerTag);
}
validate(message: MessageFormat): boolean {
return !!message.userid;
}
do(message: MessageFormat): void {
console.log("Message received on stream: ");
console.log(message);
}
}
Por fim temos a implementação de negócio, e no nosso caso aqui, para essa demonstração eu só printei um log.
Conclusão
Com um pouco de design e pouco código conseguimos lidar com segurança dos mais variados cenários, garantindo resiliência no fluxo de Queues, mas sem perder o jeito de trabalhar, para lidar com streams.
Trazemos o melhor dos 2 mundos em um desenho bem fácil de ser evoluído e estendido.
PS: sobre o repositório, esse é material do curso, por isso não está público.








0 comentários