fbpx
RabbitMQ Streams com TypeScript e NodeJS
Publicado em: sexta-feira, 10 de set de 2021
Categorias: RabbitMQ de A a Z

Em Julho o RabbitMQ ganhou suporte a Streams. De um lado aproxima o RabbitMQ do Kafka já que possui a mesma estrutura de dados, mas por outra ótica se isola completamente sendo provavelmente o único produto da categoria com suporte a ambos. Mas você sabia que a diferença entre Streams e Queues são enormes? Ao ponto da discussão RabbitMQ vs Kafka sequer fazer muito sentido?

Hoje entrego mais um projeto de exemplo para os alunos do curso de RabbitMQ, e dessa vez é uma demo 2 em 1. Além de apresentar RabbitMQ com NodeJS, aproveitei para entregar o suporte a Streams e demonstrar as demandas específicas para lidar com o consumo dessa estrutura, visando mostrar uma forma uniforme de trabalhar com queues e streams.

A API que lida com streams no RabbitMQ é totalmente compatível com o AMQP. Isso faz com que providers velhos consigam lidar com Streams sem necessidade de reescrita de código.

É claro que esse não é o modelo mais performático e eficiente, um modelo enxuto deve ser criado e a galera do RabbitMQ que já criou um projeto para o Cliente .NET desde os primeiros dias, embora esteja vazio.

O fato é que temos a oportunidade de trabalhar com Streams nos providers AMQP, mas as streams se comportam diferente das filas, não possuem diversos controles que as filas possuem, e por isso AMQP não é o formato mais adequado para esse tipo de trabalho.

Na prática estamos falando de remover features, fundamentais para o AMQP e para o conceito de Filas, mas que são desnecessários para as streams.

Diferenças

Distribuição entre consumidores

Com filas, cada cliente recebe uma mensagem diferente. Uma mesma mensagem nunca é distribuída para mais de um consumidor de uma única fila. Se queremos que diversos consumidores recebam uma mesma mensagem, precisamos criar tantas filas quanto o número de consumidores, fazendo com que cada um tenha a sua fila.

Já com streams, todos os consumidores de uma única stream, recebem todas as mensagens da stream.

Ack

Com filas, o ack é usado para sinalizar para o RabbitMQ que a mensagem foi processada e pode ser deletada da fila. Entre AutoAck e Ack Manual há uma diferença no momento em que o ack ocorre, mas inevitavelmente é ele quem faz o trabalho sujo de confirmar o processamento. Isso produz a exclusão da mensagem da fila.

Com streams, ack é desnecessário, visto que a natureza não destrutiva dos dados da stream faz com que seja irrelevante a presença do Ack.

Reject e NoAck

Esses sequer existem no consumo de Streams.

AMQP é ineficiente para Streams

Esses controles que citei acima fazem toda diferença. São eles que asseguram que o RabbitMQ não seja apenas uma estrutura filas, mas seja um message broker. Essas características são fundamentais para o trabalho com Filas, mas irrelevante na manipulação de streams, e esse tipo de operação não é otimizado para quem não precisa disso.

Sobre o projeto

Eu havia começado essa jornada na tentativa de usar o nestjs, mas novamente vemos uma implementação tendenciosa e ruim ao lidar com RabbitMQ. Ela deixa a desejar no quesito exchanges, criando uma única exchange e associando a uma fila genérica que recebe dados de diversas naturezas, deixando para a aplicação next, fazer o roteamento.

Minha segunda alternativa foi pensar em um desenho mais manual, e essa foi a solução que ficou melhor em termos de design.

import * as Amqp from "amqp-ts";
import Bootstraper from "./Infrastructure/Bootstraper";
import { QueueConsumerAdapter } from "./QueueConsumerAdapter";
import { StreamConsumerAdapter } from "./StreamConsumerAdapter";

// docker run -e -p 15672:15672 -p 5672:5672 rabbitmq:3.9.5-management
var connection = new Amqp.Connection("amqp://localhost:5672");

const bootstrapService = new Bootstraper(connection);

bootstrapService.createEventStreamObjects();

//-------------------------------------------------------------------------
const queue = 
    bootstrapService.createEventSubscription(
         "adapter|welcome-email",  // nome da fila  
          "user.created"           // routing key
     );

const queueAdapter = new QueueConsumerAdapter(queue);
queueAdapter.start();
//-------------------------------------------------------------------------

//-------------------------------------------------------------------------
//                                       nome da stream
const stream = bootstrapService.getQueue("event-stream");
stream.prefetch(5)
//                                              offset     consumer-tag
const streamAdapter 
           = new StreamConsumerAdapter(stream,  "first",    "gago2");
streamAdapter.start()
//-------------------------------------------------------------------------

Assim que temos um consumo de filas e streams lado-a-lado.


ConsumerBase<T>

import * as Amqp from "amqp-ts";

export default abstract class ConsumerBase<T> {

    constructor(
        private queue: Amqp.Queue,
        private options: Amqp.Queue.ActivateConsumerOptions,
    ) { }

    start(): Promise<Amqp.Queue.StartConsumerResult> {
        //Choreography
        return this.queue.activateConsumer((message) => {
            try {
                const typedMessage: T = message.getContent();
                if (this.validate(typedMessage)) {
                    this.do(typedMessage);
                    this.onSuccess(message);
                } else {
                    console.log('Invalid', typedMessage);
                    this.onInvalid(message, typedMessage);
                }
            } catch (e) {
                console.log(e);
                this.onException(message, e)
            }
        }, this.options);
    }
    stop() {
        this.queue.stopConsumer();
    }

    abstract onException(msg: Amqp.Message, e: Error): any;
    abstract onSuccess(msg: Amqp.Message): any;
    abstract onInvalid(msg: Amqp.Message, typedMessage: T): any;
    abstract validate(message: T): boolean;
    abstract do(message: T): void;
}

O papel dessa classe é trazer uma coreografia, com o fluxo seguro e resiliente, mas sem implementá-lo.


StreamConsumerBase<T>

import * as Amqp from "amqp-ts";
import ConsumerBase from "./ConsumerBase";

export default abstract class StreamConsumerBase<T> extends ConsumerBase<T> {

    constructor(queue: Amqp.Queue, offset: any, consumerTag?: string) {
        super(queue, {
            arguments: {
                "x-stream-offset": offset ?? "next",
            },
            consumerTag: consumerTag
        });
    }

    onException(msg: Amqp.Message, e: Error): any { msg.ack(); }

    onSuccess(msg: Amqp.Message): any { msg.ack(); }

    onInvalid(msg: Amqp.Message, typedMessage: T): any { msg.ack(); }

}

Esse é o segundo nível de herança, aqui diferenciamos os comportamentos de Streams e Queues. Essa é a classe que lida com Streams, a versão que lida com Queues eu não coloquei aqui no post.


StreamConsumerAdapter

import * as Amqp from "amqp-ts";
import StreamConsumerBase from "./Infrastructure/MessageQueuing/StreamConsumerBase";

type MessageFormat = {
    userid: string
}

export class StreamConsumerAdapter extends StreamConsumerBase<MessageFormat> {

    constructor(queue: Amqp.Queue, offset: any, consumerTag?: string) {
        super(queue, offset, consumerTag);
    }

    validate(message: MessageFormat): boolean {
        return !!message.userid;
    }

    do(message: MessageFormat): void {
        console.log("Message received on stream: ");
        console.log(message);
    }

}

Por fim temos a implementação de negócio, e no nosso caso aqui, para essa demonstração eu só printei um log.

Conclusão

Com um pouco de design e pouco código conseguimos lidar com segurança dos mais variados cenários, garantindo resiliência no fluxo de Queues, mas sem perder o jeito de trabalhar, para lidar com streams.

Trazemos o melhor dos 2 mundos em um desenho bem fácil de ser evoluído e estendido.

PS: sobre o repositório, esse é material do curso, por isso não está público.

A MasterClass virou Curso

Todo o conteúdo da MasterClass virou Bonus!

Além de mais 3 outros bonus incríveis para ajudar todos que precisam de um up na carreira.

RabbitMQ Newsletter

Novidades e ofertas de conteúdo exclusivo e único no Brasil.

Em 2021 fizemos uma MasterClass em Janeiro, agora em Junho está rolando um treinamento.

RabbitMQ é um assunto que me agrada muito, pois sua capacidade de entregar resultado em projetos de todos os tamanhos é um divisor de águas. Não é tudo que conseguimos usar e tirar proveito em ambientes menores.

A primeira vez que usei RabbitMQ foi em 2013. Desde então nunca mais deixei de usar.

Luiz Carlos Faria

Mensagem do Autor

Espero que goste desse post. Não deixe de comentar e falar o que achou. 

Se acha que esse post pode ajudar alguém que você conheça, compartilhe!

 

Gostou do post?

Esse post é uma fração do que eu posso te ajudar em relação a RabbitMQ.

Venha conhecer o que RabbitMQ pode fazer por você e seus projetos.

Se você quer entregar mais Disponibilidade, Eficiência, Resiliência, Confiabilidade e/ou Escalabilidade, com aplicações .NET, esse curso vai te ajudar a conquistar o sucesso da sua implantação.

Categorias

Assine

0 comentários

Enviar um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.

Lives

Fique de olho nas lives

Fique de olho nas lives no meu canal do Youtube, no Canal .NET e nos Grupos do Facebook e Instagram.

Aceleradores

Existem diversas formas de viabilizar o suporte ao teu projeto. Seja com os treinamentos, consultoria, mentorias em grupo.