fbpx
Publicado em: domingo, 12 de abr de 2020
Oragon.AspNetCore.Hosting.AMQP

Se olharmos com cuidado para o HTTP e AMQP conseguimos encontrar semelhanças das mais diversas. Headers, Body. Se olharmos sobre as implementações sob o HTTP que conhecemos, vemos também outras características comuns como Routing, parsing.

Fato que usar a infraestrutura base do ASP.NET Core, com Model Binders, Routing, Controllers seria interessante, mas trabalhar com esses elementos sob AMQP, aumenta muito a resiliência de nossas aplicações. É como inverter o conceito de load balancer, onde este, deixa propagar mensagens via novos requests, para entregar um modelo reativo consistente e extremamente resiliente.

Sob essa ótica esse projeto nasce como um experimento que permite colocar todo os processamento de filas sob o ASP.NET Core, como se fosse HTTP. Dividido em 2 partes, Server e Worker temos:

Server

A infraestrutura de server consiste em um middleware que permite interceptar requisições HTTP e colocá-las em filas.

using Oragon.AspNetCore.Hosting.AMQP;
using RabbitMQ.Client;

namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAProxy
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddSingleton<IConnectionFactory>(new ConnectionFactory()
            {
                HostName = "192.168.1.88",
                UserName = "usuario",
                Password = "senha",
                VirtualHost = "exemplo_amqp",
                Port = 5672
            });
            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
        }

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            app.UseAmqp(it =>
                it.WithGroupName("dynamic_api")
                .WithRoute("GET", "/api/", Pattern.Rpc)
                .WithRoute("POST", "/api/", Pattern.FireAndForget)
                .WithRoute("PUT", "/api/", Pattern.FireAndForget)
                .WithRoute("DELETE", "/api/", Pattern.FireAndForget)
                .WithRoute("*", "/api/", Pattern.Ignore)
                .WithPoolSize(10)
                .WithConnectionFactory(app.ApplicationServices.GetRequiredService<IConnectionFactory>())
            );
            app.UseMvc();
        }
    }
}

Com base nos grupos, que são implementados implicitamente por exchanges e filas pelo RabbitMQ, conseguimos suportar:

  • Fire and Forget
  • Rpc
  • Ignore

Worker

Agora nos workers tratamos cada requisição. Dessa vez a mudança é feita na classe Program, onde substituímos o hosting padrão por um host de teste, programável.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Oragon.AspNetCore.Hosting.AMQP.Worker;
using RabbitMQ.Client;

namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAWorker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            IWebHostBuilder config = CreateWebHostBuilder(args);
            AMQPWorkerBootstrapper.Run(config, builder =>
                builder
                .WithGroupName("dynamic_api")
                .WithConnectionFactory(new ConnectionFactory()
                {
                    HostName = "192.168.1.88",
                    UserName = "usuario",
                    Password = "senha",
                    VirtualHost = "exemplo_amqp",
                    Port = 5672
                })
                .WithPoolSize(10)
            );           
        }

        public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
            WebHost.CreateDefaultBuilder(args)
                .UseStartup<Startup>();
    }
}

Como podem ver abaixo, uma implementação de controller web api continua a mesma.

using System.Collections.Generic;
using Microsoft.AspNetCore.Mvc;

namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAWorker.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ValuesController : ControllerBase
    {
        // GET api/values
        [HttpGet]
        public ActionResult<IEnumerable<string>> Get()
        {
            return new string[] { "worker", "worker" };
        }

        // GET api/values/5
        [HttpGet("{id}")]
        public ActionResult<string> Get(int id)
        {
            return "worker/get";
        }

        // POST api/values
        [HttpPost]
        public void Post([FromBody] string value)
        {
        }

        // PUT api/values/5
        [HttpPut("{id}")]
        public void Put(int id, [FromBody] string value)
        {
        }

        // DELETE api/values/5
        [HttpDelete("{id}")]
        public void Delete(int id)
        {
        }
    }
}

Origem

Esse projeto foi concebido como parte da infraestrutura do WBot da Ebix, plataforma de integração e desenvolvimento de ChatBots. Um dos problemas endereçados a essa solução era criar um modelo consistente de desenvolvimento, com o máximo de confiabilidade, digno de um ambiente corporativo de alta capacidade.

Indicações

Essa solução pode ser indicada para integrações entre sistemas, permitindo que workers estejam offline e retomem seu trabalho após um novo deploy ou quando preferir.

Isso é válido para requisições Fire and Forget.

Requisições Rpc, precisam de um worker funcional ou ocasionarão timeout.

Vale a pena segmentar workers em Workers RPC e workers Fire and Forget quando houver alto downtime de workers. Caso haja segundos ou poucos minutos de donwtime, não há necessidade de segmentação.

Know Issues

  • Para chamadas RPC, falta implementar TimeOut configurável. Hoje está sendo usado o timeout padrão do RabbitMQ.Client.MessagePatterns.SimpleRpcClient que usa TimeOut.Infinite para gerenciar o aguardo da resposta.
  • A implantação do SonarQube não está contemplando todas as regras e nesse momento não está avaliando cobertura de código.

0 comentários

Enviar um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.

Lives

Fique de olho nas lives

Fique de olho nas lives no meu canal do Youtube, no Canal .NET e nos Grupos do Facebook e Instagram.

Aceleradores

Existem diversas formas de viabilizar o suporte ao teu projeto. Seja com os treinamentos, consultoria, mentorias em grupo.