Oragon.AspNetCore.Hosting.AMQP

Web API over AMQP

Ultima Release: 08/30/2018

Se olharmos com cuidado para o HTTP e AMQP conseguimos encontrar semelhanças das mais diversas. Headers, Body. Se olharmos sobre as implementações sob o HTTP que conhecemos, vemos também outras características comuns como Routing, parsing.

Fato que usar a infraestrutura base do ASP.NET Core, com Model Binders, Routing, Controllers seria interessante, mas trabalhar com esses elementos sob AMQP, aumenta muito a resiliência de nossas aplicações. É como inverter o conceito de load balancer, onde este, deixa propagar mensagens via novos requests, para entregar um modelo reativo consistente e extremamente resiliente.

Sob essa ótica esse projeto nasce como um experimento que permite colocar todo os processamento de filas sob o ASP.NET Core, como se fosse HTTP. Dividido em 2 partes, Server e Worker temos:

Server

A infraestrutura de server consiste em um middleware que permite interceptar requisições HTTP e colocá-las em filas.

using Oragon.AspNetCore.Hosting.AMQP;
using RabbitMQ.Client;

namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAProxy
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddSingleton<IConnectionFactory>(new ConnectionFactory()
            {
                HostName = "192.168.1.88",
                UserName = "usuario",
                Password = "senha",
                VirtualHost = "exemplo_amqp",
                Port = 5672
            });
            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
        }

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            app.UseAmqp(it =>
                it.WithGroupName("dynamic_api")
                .WithRoute("GET", "/api/", Pattern.Rpc)
                .WithRoute("POST", "/api/", Pattern.FireAndForget)
                .WithRoute("PUT", "/api/", Pattern.FireAndForget)
                .WithRoute("DELETE", "/api/", Pattern.FireAndForget)
                .WithRoute("*", "/api/", Pattern.Ignore)
                .WithPoolSize(10)
                .WithConnectionFactory(app.ApplicationServices.GetRequiredService<IConnectionFactory>())
            );
            app.UseMvc();
        }
    }
}

Com base nos grupos, que são implementados implicitamente por exchanges e filas pelo RabbitMQ, conseguimos suportar:

  • Fire and Forget
  • Rpc
  • Ignore

Worker

Agora nos workers tratamos cada requisição. Dessa vez a mudança é feita na classe Program, onde substituímos o hosting padrão por um host de teste, programável.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Oragon.AspNetCore.Hosting.AMQP.Worker;
using RabbitMQ.Client;

namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAWorker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            IWebHostBuilder config = CreateWebHostBuilder(args);
            AMQPWorkerBootstrapper.Run(config, builder =>
                builder
                .WithGroupName("dynamic_api")
                .WithConnectionFactory(new ConnectionFactory()
                {
                    HostName = "192.168.1.88",
                    UserName = "usuario",
                    Password = "senha",
                    VirtualHost = "exemplo_amqp",
                    Port = 5672
                })
                .WithPoolSize(10)
            );           
        }

        public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
            WebHost.CreateDefaultBuilder(args)
                .UseStartup<Startup>();
    }
}

Como podem ver abaixo, uma implementação de controller web api continua a mesma.

using System.Collections.Generic;
using Microsoft.AspNetCore.Mvc;

namespace Oragon.AspNetCore.Hosting.AMQP.IntegratedTests.HAWorker.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ValuesController : ControllerBase
    {
        // GET api/values
        [HttpGet]
        public ActionResult<IEnumerable<string>> Get()
        {
            return new string[] { "worker", "worker" };
        }

        // GET api/values/5
        [HttpGet("{id}")]
        public ActionResult<string> Get(int id)
        {
            return "worker/get";
        }

        // POST api/values
        [HttpPost]
        public void Post([FromBody] string value)
        {
        }

        // PUT api/values/5
        [HttpPut("{id}")]
        public void Put(int id, [FromBody] string value)
        {
        }

        // DELETE api/values/5
        [HttpDelete("{id}")]
        public void Delete(int id)
        {
        }
    }
}

Origem

Esse projeto foi concebido como parte da infraestrutura do WBot da Ebix, plataforma de integração e desenvolvimento de ChatBots. Um dos problemas endereçados a essa solução era criar um modelo consistente de desenvolvimento, com o máximo de confiabilidade, digno de um ambiente corporativo de alta capacidade.

Indicações

Essa solução pode ser indicada para integrações entre sistemas, permitindo que workers estejam offline e retomem seu trabalho após um novo deploy ou quando preferir.

Isso é válido para requisições Fire and Forget.

Requisições Rpc, precisam de um worker funcional ou ocasionarão timeout.

Vale a pena segmentar workers em Workers RPC e workers Fire and Forget quando houver alto downtime de workers. Caso haja segundos ou poucos minutos de donwtime, não há necessidade de segmentação.

Know Issues

  • Para chamadas RPC, falta implementar TimeOut configurável. Hoje está sendo usado o timeout padrão do RabbitMQ.Client.MessagePatterns.SimpleRpcClient que usa TimeOut.Infinite para gerenciar o aguardo da resposta.
  • A implantação do SonarQube não está contemplando todas as regras e nesse momento não está avaliando cobertura de código.
Share This