fbpx
RabbitMQ Streams com TypeScript e NodeJS
Publicado em: sexta-feira, 10 de set de 2021
Categorias: RabbitMQ de A a Z

Em Julho o RabbitMQ ganhou suporte a Streams. De um lado aproxima o RabbitMQ do Kafka já que possui a mesma estrutura de dados, mas por outra ótica se isola completamente sendo provavelmente o único produto da categoria com suporte a ambos. Mas você sabia que a diferença entre Streams e Queues são enormes? Ao ponto da discussão RabbitMQ vs Kafka sequer fazer muito sentido?

Hoje entrego mais um projeto de exemplo para os alunos do curso de RabbitMQ, e dessa vez é uma demo 2 em 1. Além de apresentar RabbitMQ com NodeJS, aproveitei para entregar o suporte a Streams e demonstrar as demandas específicas para lidar com o consumo dessa estrutura, visando mostrar uma forma uniforme de trabalhar com queues e streams.

A API que lida com streams no RabbitMQ é totalmente compatível com o AMQP. Isso faz com que providers velhos consigam lidar com Streams sem necessidade de reescrita de código.

É claro que esse não é o modelo mais performático e eficiente, um modelo enxuto deve ser criado e a galera do RabbitMQ que já criou um projeto para o Cliente .NET desde os primeiros dias, embora esteja vazio.

O fato é que temos a oportunidade de trabalhar com Streams nos providers AMQP, mas as streams se comportam diferente das filas, não possuem diversos controles que as filas possuem, e por isso AMQP não é o formato mais adequado para esse tipo de trabalho.

Na prática estamos falando de remover features, fundamentais para o AMQP e para o conceito de Filas, mas que são desnecessários para as streams.

Diferenças

Distribuição entre consumidores

Com filas, cada cliente recebe uma mensagem diferente. Uma mesma mensagem nunca é distribuída para mais de um consumidor de uma única fila. Se queremos que diversos consumidores recebam uma mesma mensagem, precisamos criar tantas filas quanto o número de consumidores, fazendo com que cada um tenha a sua fila.

Já com streams, todos os consumidores de uma única stream, recebem todas as mensagens da stream.

Ack

Com filas, o ack é usado para sinalizar para o RabbitMQ que a mensagem foi processada e pode ser deletada da fila. Entre AutoAck e Ack Manual há uma diferença no momento em que o ack ocorre, mas inevitavelmente é ele quem faz o trabalho sujo de confirmar o processamento. Isso produz a exclusão da mensagem da fila.

Com streams, ack é desnecessário, visto que a natureza não destrutiva dos dados da stream faz com que seja irrelevante a presença do Ack.

Reject e NoAck

Esses sequer existem no consumo de Streams.

AMQP é ineficiente para Streams

Esses controles que citei acima fazem toda diferença. São eles que asseguram que o RabbitMQ não seja apenas uma estrutura filas, mas seja um message broker. Essas características são fundamentais para o trabalho com Filas, mas irrelevante na manipulação de streams, e esse tipo de operação não é otimizado para quem não precisa disso.

Sobre o projeto

Eu havia começado essa jornada na tentativa de usar o nestjs, mas novamente vemos uma implementação tendenciosa e ruim ao lidar com RabbitMQ. Ela deixa a desejar no quesito exchanges, criando uma única exchange e associando a uma fila genérica que recebe dados de diversas naturezas, deixando para a aplicação next, fazer o roteamento.

Minha segunda alternativa foi pensar em um desenho mais manual, e essa foi a solução que ficou melhor em termos de design.

import * as Amqp from "amqp-ts";
import Bootstraper from "./Infrastructure/Bootstraper";
import { QueueConsumerAdapter } from "./QueueConsumerAdapter";
import { StreamConsumerAdapter } from "./StreamConsumerAdapter";

// docker run -e -p 15672:15672 -p 5672:5672 rabbitmq:3.9.5-management
var connection = new Amqp.Connection("amqp://localhost:5672");

const bootstrapService = new Bootstraper(connection);

bootstrapService.createEventStreamObjects();

//-------------------------------------------------------------------------
const queue = 
    bootstrapService.createEventSubscription(
         "adapter|welcome-email",  // nome da fila  
          "user.created"           // routing key
     );

const queueAdapter = new QueueConsumerAdapter(queue);
queueAdapter.start();
//-------------------------------------------------------------------------

//-------------------------------------------------------------------------
//                                       nome da stream
const stream = bootstrapService.getQueue("event-stream");
stream.prefetch(5)
//                                              offset     consumer-tag
const streamAdapter 
           = new StreamConsumerAdapter(stream,  "first",    "gago2");
streamAdapter.start()
//-------------------------------------------------------------------------

Assim que temos um consumo de filas e streams lado-a-lado.


ConsumerBase<T>

import * as Amqp from "amqp-ts";

export default abstract class ConsumerBase<T> {

    constructor(
        private queue: Amqp.Queue,
        private options: Amqp.Queue.ActivateConsumerOptions,
    ) { }

    start(): Promise<Amqp.Queue.StartConsumerResult> {
        //Choreography
        return this.queue.activateConsumer((message) => {
            try {
                const typedMessage: T = message.getContent();
                if (this.validate(typedMessage)) {
                    this.do(typedMessage);
                    this.onSuccess(message);
                } else {
                    console.log('Invalid', typedMessage);
                    this.onInvalid(message, typedMessage);
                }
            } catch (e) {
                console.log(e);
                this.onException(message, e)
            }
        }, this.options);
    }
    stop() {
        this.queue.stopConsumer();
    }

    abstract onException(msg: Amqp.Message, e: Error): any;
    abstract onSuccess(msg: Amqp.Message): any;
    abstract onInvalid(msg: Amqp.Message, typedMessage: T): any;
    abstract validate(message: T): boolean;
    abstract do(message: T): void;
}

O papel dessa classe é trazer uma coreografia, com o fluxo seguro e resiliente, mas sem implementá-lo.


StreamConsumerBase<T>

import * as Amqp from "amqp-ts";
import ConsumerBase from "./ConsumerBase";

export default abstract class StreamConsumerBase<T> extends ConsumerBase<T> {

    constructor(queue: Amqp.Queue, offset: any, consumerTag?: string) {
        super(queue, {
            arguments: {
                "x-stream-offset": offset ?? "next",
            },
            consumerTag: consumerTag
        });
    }

    onException(msg: Amqp.Message, e: Error): any { msg.ack(); }

    onSuccess(msg: Amqp.Message): any { msg.ack(); }

    onInvalid(msg: Amqp.Message, typedMessage: T): any { msg.ack(); }

}

Esse é o segundo nível de herança, aqui diferenciamos os comportamentos de Streams e Queues. Essa é a classe que lida com Streams, a versão que lida com Queues eu não coloquei aqui no post.


StreamConsumerAdapter

import * as Amqp from "amqp-ts";
import StreamConsumerBase from "./Infrastructure/MessageQueuing/StreamConsumerBase";

type MessageFormat = {
    userid: string
}

export class StreamConsumerAdapter extends StreamConsumerBase<MessageFormat> {

    constructor(queue: Amqp.Queue, offset: any, consumerTag?: string) {
        super(queue, offset, consumerTag);
    }

    validate(message: MessageFormat): boolean {
        return !!message.userid;
    }

    do(message: MessageFormat): void {
        console.log("Message received on stream: ");
        console.log(message);
    }

}

Por fim temos a implementação de negócio, e no nosso caso aqui, para essa demonstração eu só printei um log.

Conclusão

Com um pouco de design e pouco código conseguimos lidar com segurança dos mais variados cenários, garantindo resiliência no fluxo de Queues, mas sem perder o jeito de trabalhar, para lidar com streams.

Trazemos o melhor dos 2 mundos em um desenho bem fácil de ser evoluído e estendido.

PS: sobre o repositório, esse é material do curso, por isso não está público.

Você pediu e agora virou curso. Mensageria .NET é minha formação de especialista em RabbitMQ com .NET, onde ensino RabbitMQ do básico, cada fundamento, cada detalhe, ao avançado.

Onde você vai sair do zero absoluto e vai conseguir criar, projetar e corrigir soluções .NET com RabbitMQ.

Além de contar mais 3 outros bonus incríveis para ajudar todos que precisam de um up na carreira.

RabbitMQ Newsletter

Novidades e ofertas de conteúdo exclusivo e único no Brasil.

Hoje com orgulho somos referência quando se fala em RabbitMQ com .NET.

São quase 10 anos usando RabbitMQ em projetos .NET com C#, implantando, convencendo times e mostrando o caminho para aslcançar sos 5 benefícios.

Após centenas de pedidos, criei um curso dedicado aos profissionais .NET. 

Aqui nessa newsletter eu te entrego promoções e links especiais! Cola aqui, tem muita coisa legal!

Luiz Carlos Faria

Meu primeiro contato com RabbitMQ foi em 2013.

Eu estava sozinho na definição de uma arquitetura para a reestruturação de uma integração enquanto meu time estava ocupado com o dia-a-dia.

Naquela época eu precisava de apenas 1 ou 2 recursos que o RabbitMQ entregava.

Nas primeiras semanas e meses em produção pude perceber coisas que não estavam escritas em lugar algum, benefícios e formas de uso das quais poderiam resolver anos de frustração.

Desde então RabbitMQ tem sido meu aliado na restruturação de projetos dos mais variados.

E por mais simples que seja, ainda é possível, 10 anos depois, gerar surpresas com novas abordagens que geram novos benefícios.

7 dias

É tudo que precisa para sair do zero, à produção!

Com conforto, com segurança, e com apoio.

Desde que você já seja um desenvolvedor profissional.

Se você quer entregar mais Disponibilidade, Eficiência, Resiliência, Confiabilidade e/ou Escalabilidade, em projetos .NET, aqui é o seu lugar.

0 comentários

Enviar um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.

Lives

Fique de olho nas lives

Fique de olho nas lives no meu canal do Youtube, no Canal .NET e nos Grupos do Facebook e Instagram.

Aceleradores

Existem diversas formas de viabilizar o suporte ao teu projeto. Seja com os treinamentos, consultoria, mentorias em grupo.