Se olharmos com cuidado para o HTTP e AMQP conseguimos encontrar semelhanças das mais diversas. Headers, Body. Se olharmos sobre as implementações sob o HTTP que conhecemos, vemos também outras características comuns como Routing, parsing.
Fato que usar a infraestrutura base do ASP.NET Core, com Model Binders, Routing, Controllers seria interessante, mas trabalhar com esses elementos sob AMQP, aumenta muito a resiliência de nossas aplicações. É como inverter o conceito de load balancer, onde este, deixa propagar mensagens via novos requests, para entregar um modelo reativo consistente e extremamente resiliente.
Sob essa ótica esse projeto nasce como um experimento que permite colocar todo os processamento de filas sob o ASP.NET Core, como se fosse HTTP. Dividido em 2 partes, Server e Worker temos:
Server
A infraestrutura de server consiste em um middleware que permite interceptar requisições HTTP e colocá-las em filas.
using Oragon.AspNetCore.Hosting.AMQP; using RabbitMQ.Client; namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAProxy { public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { services.AddSingleton<IConnectionFactory>(new ConnectionFactory() { HostName = "192.168.1.88", UserName = "usuario", Password = "senha", VirtualHost = "exemplo_amqp", Port = 5672 }); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); } public void Configure(IApplicationBuilder app, IHostingEnvironment env) { app.UseAmqp(it => it.WithGroupName("dynamic_api") .WithRoute("GET", "/api/", Pattern.Rpc) .WithRoute("POST", "/api/", Pattern.FireAndForget) .WithRoute("PUT", "/api/", Pattern.FireAndForget) .WithRoute("DELETE", "/api/", Pattern.FireAndForget) .WithRoute("*", "/api/", Pattern.Ignore) .WithPoolSize(10) .WithConnectionFactory(app.ApplicationServices.GetRequiredService<IConnectionFactory>()) ); app.UseMvc(); } } }
Com base nos grupos, que são implementados implicitamente por exchanges e filas pelo RabbitMQ, conseguimos suportar:
- Fire and Forget
- Rpc
- Ignore
Worker
Agora nos workers tratamos cada requisição. Dessa vez a mudança é feita na classe Program, onde substituímos o hosting padrão por um host de teste, programável.
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Oragon.AspNetCore.Hosting.AMQP.Worker; using RabbitMQ.Client; namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAWorker { public class Program { public static void Main(string[] args) { IWebHostBuilder config = CreateWebHostBuilder(args); AMQPWorkerBootstrapper.Run(config, builder => builder .WithGroupName("dynamic_api") .WithConnectionFactory(new ConnectionFactory() { HostName = "192.168.1.88", UserName = "usuario", Password = "senha", VirtualHost = "exemplo_amqp", Port = 5672 }) .WithPoolSize(10) ); } public static IWebHostBuilder CreateWebHostBuilder(string[] args) => WebHost.CreateDefaultBuilder(args) .UseStartup<Startup>(); } }
Como podem ver abaixo, uma implementação de controller web api continua a mesma.
using System.Collections.Generic; using Microsoft.AspNetCore.Mvc; namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAWorker.Controllers { [Route("api/[controller]")] [ApiController] public class ValuesController : ControllerBase { // GET api/values [HttpGet] public ActionResult<IEnumerable<string>> Get() { return new string[] { "worker", "worker" }; } // GET api/values/5 [HttpGet("{id}")] public ActionResult<string> Get(int id) { return "worker/get"; } // POST api/values [HttpPost] public void Post([FromBody] string value) { } // PUT api/values/5 [HttpPut("{id}")] public void Put(int id, [FromBody] string value) { } // DELETE api/values/5 [HttpDelete("{id}")] public void Delete(int id) { } } }
Origem
Esse projeto foi concebido como parte da infraestrutura do WBot da Ebix, plataforma de integração e desenvolvimento de ChatBots. Um dos problemas endereçados a essa solução era criar um modelo consistente de desenvolvimento, com o máximo de confiabilidade, digno de um ambiente corporativo de alta capacidade.
Indicações
Essa solução pode ser indicada para integrações entre sistemas, permitindo que workers estejam offline e retomem seu trabalho após um novo deploy ou quando preferir.
Isso é válido para requisições Fire and Forget.
Requisições Rpc, precisam de um worker funcional ou ocasionarão timeout.
Vale a pena segmentar workers em Workers RPC e workers Fire and Forget quando houver alto downtime de workers. Caso haja segundos ou poucos minutos de donwtime, não há necessidade de segmentação.
Know Issues
- Para chamadas RPC, falta implementar TimeOut configurável. Hoje está sendo usado o timeout padrão do RabbitMQ.Client.MessagePatterns.SimpleRpcClient que usa TimeOut.Infinite para gerenciar o aguardo da resposta.
- A implantação do SonarQube não está contemplando todas as regras e nesse momento não está avaliando cobertura de código.
0 comentários