RabbitMQ & AMQP – #6 – Show me the code
Publicado em: segunda-feira, 25 de fev de 2019
Categorias: RabbitMQ de A a Z
Tags: AMQP | RabbitMQ

No post anterior eu mostrei como as coisas acontecem na interface de gerenciamento do RabbitMQ. Agora é hora de mostrar as principais iterações com o RabbitMQ via código.

Naturalmente, privilegiarei C# aqui, mas você pode usar as principais tecnologias para realizar as tarefas a seguir. O objetivo desse post é lhe ambientar, já que estamos tentando falar em alto nível, se você nunca viu RabbitMQ então irá te ajudar a entender como as coisas funcionam.

Pacote Nuget

Todas as dependências do RabbitMQ são carregadas a partir desse pacote: https://www.nuget.org/packages/RabbitMQ.Client

Enviando uma mensagem para uma fila

Criar uma fila manualmente pela interface ou criar dinamicamente via código faz parte da estratégia que você vai definir. Não há limitação em nenhum dos dos lados, você precisa fazer escolhas que melhor se adequam ao seu perfil de uso.

É muito importante ficar claro que haverá cenário em que você criará filas temporárias, criadas dinamicamente via código. É o fundamento do fluxo de RPC sobre filas, por exemplo.

Fila previamente criada

Nesse exemplo, tirado da documentação, temos a publicação de uma mensagem usando a exchange direct, enviando a mensagem para a fila hello.

using System;
using RabbitMQ.Client;
using System.Text;

class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);

        }

    }
}

 

Nova fila

Já nesse caso, temos a criação da fila antes do envio.

using System;
using RabbitMQ.Client;
using System.Text;

class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);
        }
    }
}

 

Consumindo mensagens de uma fila

Agora temos um exemplo COM AUTOACK. É o exemplo que encontramos na documentação. Você sabe que não usará AutoAck nunca né?!  De qualquer forma deixei aqui para poder falar nisso.

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);               
            };

            channel.BasicConsume(queue: "hello",
                                 autoAck: true,
                                 consumer: consumer);

            Console.ReadLine();
        }
    }
}

Note que é o consumo do Evento Receive que efetivamente é encarregado por processar a mensagem que veio de uma fila.

Outro ponto de atenção é que o código está desempacotando o body da mensagem. Isso quer dizer que não há automação nesse processo. Se você serializar um objeto em json, terá de deserializar. Não é responsabilidade do RabbitMQ fazer isso por você, mas os envelopes de mensagem (essa message que você recebe, é um envelope) possuem cabeçalhos e são bons para expressar.

Sobre o acknowledgment

Já no segundo tutorial temos:

Message acknowledgment

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren’t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It’s fine even if processing a message takes a very, very long time.

Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off by setting the autoAck (“automatic acknowledgement mode”) parameter to true. It’s time to remove this flag and manually send a proper acknowledgment from the worker, once we’re done with a task.

Tradução + TL;DR;

Se usar autoack vai se lascar todo!

Brincadeiras à parte, autoack pode te custar muito caro, e o problema é que quem faz autoack faz antes da mensagem ser processada. Isso quer dizer que quem o fará, não sabe se a mensagem foi processada ou não. Funciona como uma confirmação de entrega sem confirmação de recebimento. Enfim, lhe permite construir uma solida arquitetura distribuída sob um castelo de areia.

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
				
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            channel.BasicConsume(queue: "hello",
                                 autoAck: false,
                                 consumer: consumer);

            Console.ReadLine();
        }
    }
}

 

Acima temos o me o mesmo exemplo, com acknowledgment.

Abaixo temos a comparação entre as duas estratégias:

 

A diferença é extremamente simples, se traduz em uma chamada a mais no código no entanto não tê-la é sinônimo de caos à vista.

Por fim temos a criação de um binding, que liga uma fila a uma exchange. Olhe o exemplo 3 do tutorial do RabbitMQ:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

No texto, em inglês, você encontra toda a narrativa que conta como e para que ele está criando a fila dinamicamente. Como nós já falamos sobre isso, fico aqui apenas com o código. Mas vou pontuar algumas linhas que considero relevantes para a explicação.

Linha 14: A exchange é criada. Note que é do tipo fanout (saiba mais).

Linha 16: O código está criando uma fila anônima. O RabbitMQ criará uma fila e entregará o nome como resposta.

Linha 17: Com o nome da fila, é criado um bind para a exchange (essa sim criada previamente).

 

Aviso

Por hora vou atender a outros projetos pessoais, portanto retomo a série após o carnaval.

 

Luiz Carlos Faria

Mensagem do Autor

Espero que goste desse post. Não deixe de comentar e falar o que achou. 

Se acha que esse post pode ajudar alguém que você conheça, compartilhe!

 

Eventos passados

agosto 2020
julho 2020
Nenhum evento encontrado!
Carregar Mais

Publicidade

Assine

7 Comentários

  1. Rafael

    Olá Luiz, sou iniciante, li seu artigo, entendi um pouco, mais estou com dúvida. Estou usando no meu projeto, um servidor ubuntu server, nginx como proxy reverso, .net core 2 linux, identityserver4 com postgresql para identidade do usuário..
    Eu tenho algumas dúvidas: Para construção de notificação de status do site e um aplicativo de chat privado .net core 2, é recomendável utilizar o Rabbitmq ou Redis? qual é melhor opção? Se for usar Rabbitmq, o envio teria que ser por direct? Qual seria a melhor forma de separar as mensagens privadas dos usuários que possuem guid e dos usuários anônimos dentro do Rabbitmq? teria que criar uma fila para cada usuário? me de um exemplo por favor?

    Responder
    • Luiz Carlos Faria

      Redis é um Cache, (até hoje não entendo como forçaram a barra para chamá-lo de NoSQL, mas tudo bem). Ele tem um pub/sub, que nunca usei, dificilmente usaria e não me vejo recomendando.

      É simples: A maioria das implantações de Redis, sequer trabalha com persistência, isso quer dizer que se um servidor morre, seus dados morrem junto.
      E quando você habilita persistência, tem outro problema: os ciclos de escrita, que em geral são em segundos.
      Isso quer dizer que em um cenário de queda/falha/shutdown ou reboot, os dados imputados ou alterados desde o último ciclo até o momento do shutdown são perdidos.

      Sendo assim, REDIS eu uso para cache, e somente cache passivo (alguém que sabe processar um recurso de alto IO, usa o Redis para não realizar a tarefa e mas quando necessário a faz, preenchendo o cache novamente).

      Dito isso, Redis não é uma opção para nenhum dos assuntos que listou. Pelo menos para mim. Para um chat e status, sinceramente eu usaria SignalR.

      De qualquer forma, RabbitMQ pode resolver suas questões com certa facilidade também, já que trabalha com Stomp e Mqtt protocolos de mensageria feitos para a web. Na documentação você encontra detalhes sobre os plugins e como habilitá-los. Eu tenho uma apresentação que mostra isso e tem um repositório com exemplos “Message Broker & .Net Core – Introdução ao RabbitMQ” https://gago.io/blog/message-broker-net-core-introducao-ao-rabbitmq/.

      Se você é iniciante, redobre o cuidado, há maior probabilidade de tomar decisões erradas do que certas, e revalidar suas próprias decisões quebra esse dogma da juventude. É sadio revisar as premissas que levaram a uma decisão.

      No mais obrigado por participar perguntando, é muito legal ver que está lendo, vi que já viu os 6 posts! Obrigado mesmo!

      Responder
  2. Henrique

    Oi Luiz, também sou iniciante em RabbitMQ e fiquei com algumas dúvidas referente ao Ack:

    – O que esse código faz?
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    O código acima complementa esse?
    channel.BasicConsume(queue: “hello”,
    autoAck: false,
    consumer: consumer);

    Responder
    • Luiz Carlos Faria

      Ack é uma abreviação de Acknowledge.

      Quando configuramos o consumo com AutoAck falso estamos buscando resiliência, isso quer dizer, estamos dizendo para o RabbitMQ para que ele só delete a mensagem da fila após receber um comando Ack (que é exatamente o que fazemos quando chamamos o método BasicAck do Model/Channel).

      Sim, são complementares.

      Quando trabalhamos com AutoAck true, é forma mais performática e não resiliente, onde a mensagem é deletada da fila no momento em que é entregue para o consumidor. Isso quer dizer que se algo ocorrer e falhar, a mensagem está perdida para todo o sempre!

      Ficou claro?

      Há bons cenários de uso para AutoAck true e AutoAck false, vale a pena uma googlada para dar uma olhada nisso.

      Responder
  3. Valdir

    Show, parabéns pelo artigo.
    Eu fiquei com uma dúvida, como faria para consumir as mensagens utilizando uma aplicação ASP.NET MVC ou uma API por exemplo? Não entendi como deixar essa aplicação escutando os eventos dessa fila.

    Responder
    • Luiz Carlos Faria

      Opa Valdir, obrigado.

      Uma aplicação web não é o melhor lugar para você hospedar o consumo de filas. Tanto a aplicação web, quanto o consumo de filas demandam escala. Por motivos diferentes, por demandas diferentes. E amarrar as duas causa, inevitavelmente o acúmulo de responsabilidades fazendo com que uma atrapalhe a outra.

      Dito isso, sim é possível sim, desde que não seja uma aplicação web antiga (cujo ciclo de vida é gerenciado pelo IIS).

      Se seu processo não sofre recycle, e você tem controle do ciclo de vida dele, pode usar uma aplicação web sim, embora eu não te aconselhe a fazer isso.

      Responder

Enviar um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.

Lives

Fique de olho nas lives

Fique de olho nas lives no meu canal do Youtube, no Canal .NET e nos Grupos do Facebook e Instagram.

Aceleradores

Existem diversas formas de viabilizar o suporte ao teu projeto. Seja com os treinamentos, consultoria, mentorias em grupo.

Agenda & Eventos

agosto

setembro 2020

outubro
DOM
SEG
TER
QUA
QUI
SEX
SAB
30
31
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1
2
3
Eventos para setembro

1st

Sem Eventos
Eventos para setembro

2nd

Sem Eventos
Eventos para setembro

3rd

Sem Eventos
Eventos para setembro

4th

Sem Eventos
Eventos para setembro

5th

Sem Eventos
Eventos para setembro

6th

Sem Eventos
Eventos para setembro

7th

Sem Eventos
Eventos para setembro

8th

Sem Eventos
Eventos para setembro

9th

Sem Eventos
Eventos para setembro

10th

Sem Eventos
Eventos para setembro

11th

Sem Eventos
Eventos para setembro

12th

Sem Eventos
Eventos para setembro

13th

Sem Eventos
Eventos para setembro

14th

Sem Eventos
Eventos para setembro

15th

Sem Eventos
Eventos para setembro

16th

Sem Eventos
Eventos para setembro

17th

Sem Eventos
Eventos para setembro

18th

Sem Eventos
Eventos para setembro

19th

Sem Eventos
Eventos para setembro

20th

Sem Eventos
Eventos para setembro

21st

Sem Eventos
Eventos para setembro

22nd

Sem Eventos
Eventos para setembro

23rd

Sem Eventos
Eventos para setembro

24th

Sem Eventos
Eventos para setembro

25th

Sem Eventos
Eventos para setembro

26th

Sem Eventos
Eventos para setembro

27th

Sem Eventos
Eventos para setembro

28th

Sem Eventos
Eventos para setembro

29th

Sem Eventos
Eventos para setembro

30th

Sem Eventos
Share This