Se olharmos com cuidado para o HTTP e AMQP conseguimos encontrar semelhanças das mais diversas. Headers, Body. Se olharmos sobre as implementações sob o HTTP que conhecemos, vemos também outras características comuns como Routing, parsing.
Fato que usar a infraestrutura base do ASP.NET Core, com Model Binders, Routing, Controllers seria interessante, mas trabalhar com esses elementos sob AMQP, aumenta muito a resiliência de nossas aplicações. É como inverter o conceito de load balancer, onde este, deixa propagar mensagens via novos requests, para entregar um modelo reativo consistente e extremamente resiliente.
Sob essa ótica esse projeto nasce como um experimento que permite colocar todo os processamento de filas sob o ASP.NET Core, como se fosse HTTP. Dividido em 2 partes, Server e Worker temos:
Server
A infraestrutura de server consiste em um middleware que permite interceptar requisições HTTP e colocá-las em filas.
using Oragon.AspNetCore.Hosting.AMQP;
using RabbitMQ.Client;
namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAProxy
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IConnectionFactory>(new ConnectionFactory()
{
HostName = "192.168.1.88",
UserName = "usuario",
Password = "senha",
VirtualHost = "exemplo_amqp",
Port = 5672
});
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseAmqp(it =>
it.WithGroupName("dynamic_api")
.WithRoute("GET", "/api/", Pattern.Rpc)
.WithRoute("POST", "/api/", Pattern.FireAndForget)
.WithRoute("PUT", "/api/", Pattern.FireAndForget)
.WithRoute("DELETE", "/api/", Pattern.FireAndForget)
.WithRoute("*", "/api/", Pattern.Ignore)
.WithPoolSize(10)
.WithConnectionFactory(app.ApplicationServices.GetRequiredService<IConnectionFactory>())
);
app.UseMvc();
}
}
}
Com base nos grupos, que são implementados implicitamente por exchanges e filas pelo RabbitMQ, conseguimos suportar:
- Fire and Forget
- Rpc
- Ignore
Worker
Agora nos workers tratamos cada requisição. Dessa vez a mudança é feita na classe Program, onde substituímos o hosting padrão por um host de teste, programável.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Oragon.AspNetCore.Hosting.AMQP.Worker;
using RabbitMQ.Client;
namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAWorker
{
public class Program
{
public static void Main(string[] args)
{
IWebHostBuilder config = CreateWebHostBuilder(args);
AMQPWorkerBootstrapper.Run(config, builder =>
builder
.WithGroupName("dynamic_api")
.WithConnectionFactory(new ConnectionFactory()
{
HostName = "192.168.1.88",
UserName = "usuario",
Password = "senha",
VirtualHost = "exemplo_amqp",
Port = 5672
})
.WithPoolSize(10)
);
}
public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>();
}
}
Como podem ver abaixo, uma implementação de controller web api continua a mesma.
using System.Collections.Generic;
using Microsoft.AspNetCore.Mvc;
namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAWorker.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ValuesController : ControllerBase
{
// GET api/values
[HttpGet]
public ActionResult<IEnumerable<string>> Get()
{
return new string[] { "worker", "worker" };
}
// GET api/values/5
[HttpGet("{id}")]
public ActionResult<string> Get(int id)
{
return "worker/get";
}
// POST api/values
[HttpPost]
public void Post([FromBody] string value)
{
}
// PUT api/values/5
[HttpPut("{id}")]
public void Put(int id, [FromBody] string value)
{
}
// DELETE api/values/5
[HttpDelete("{id}")]
public void Delete(int id)
{
}
}
}
Origem
Esse projeto foi concebido como parte da infraestrutura do WBot da Ebix, plataforma de integração e desenvolvimento de ChatBots. Um dos problemas endereçados a essa solução era criar um modelo consistente de desenvolvimento, com o máximo de confiabilidade, digno de um ambiente corporativo de alta capacidade.
Indicações
Essa solução pode ser indicada para integrações entre sistemas, permitindo que workers estejam offline e retomem seu trabalho após um novo deploy ou quando preferir.
Isso é válido para requisições Fire and Forget.
Requisições Rpc, precisam de um worker funcional ou ocasionarão timeout.
Vale a pena segmentar workers em Workers RPC e workers Fire and Forget quando houver alto downtime de workers. Caso haja segundos ou poucos minutos de donwtime, não há necessidade de segmentação.
Know Issues
- Para chamadas RPC, falta implementar TimeOut configurável. Hoje está sendo usado o timeout padrão do RabbitMQ.Client.MessagePatterns.SimpleRpcClient que usa TimeOut.Infinite para gerenciar o aguardo da resposta.
- A implantação do SonarQube não está contemplando todas as regras e nesse momento não está avaliando cobertura de código.

0 comentários