30 de outubro de 2023

(Microservices) Como utilizar mensageria com Apache Kafka

Introduzir conceitos comunicação assíncrona com mensageria com Apache Kafka

Introdução

Asynchronous

Mensageria (Streaming) é uma método de comunicação entre serviços em que é utilizado logs por meio de eventos. É uma alternativa a comunicação feita com REST. Esse tipo de comunicação é utilizado por soluções como Apache Kafka, RabbitMQ e Redis Streams, que vou chamar de Message Broker. Essa comunicação é um método utilizado na arquitetura de microsserviços por permitir o desacoplamento entre os microserviços. Isso vai ser explicado mais abaixo.

Para este exemplo utilizarei dois projetos em andamentos, um em Java com Spring Boot e outro com C# em ASP.NET MVC. O motivo de utilizar linguagens diferentes é mostrar que é possível, mas a linguagem não é importante nesse caso.

Como a comunicação entre serviços funcionam

Síncrona

Request-Reply

A comunicação síncrona é aquela em que um serviço A faz uma chamada a outro serviço B, mas tem que esperar a resposta do serviço B para dar continuidade, assim ficando bloqueado até que a resposta seja recebida. Isso é chamado de Synchronous Blocking. Para que a chamada síncrona ocorra de forma correta, os dois microsserviços A e B precisam estar rodando. O que tornam um dependente do outro.

Assíncrona

Asynchronous

A comunicação assíncrona é o inverso da síncrona. Quando um serviço A faz uma “chamada” para o serviço B ele não fica aguardando a resposta retornar, a aplicação continua. Eventualmente o serviço B vai ser receber a requisição feita pelo o serviço A e processar. Não necessariamente precisa retornar uma resposta, mas se precisar pode ser feito.

Se caso o serviço que precisa receber a chamada não estiver rodando, assim que estiver, ele vai receber todas as chamadas feitas enquanto esteve ausente. E dessa forma os dois serviços se tornam independentes e desacoplados (Loosely Coupled).

Nessa arquitetura não feita uma requisição ou chamada, e sim uma mensagem é enviada. Essa mensagem pode conter, por exemplo, um conteúdo JSON.

{
"id": "d07bde9b-39f3-473c-9b6c-3e91f805e4a6",
"status": "Not sent",
"productId": "7be37c94-ad72-48e6-bcb0-babdcc7cf314",
"createdAt": "2023-10-30T09:35:26.0446169-03:00",
"updatedAt": null,
"canceledAt": null
}

A solução utilizada foi com Apache Kafka como Message Broker.

Para saber mais sobre isso recomendo o livro Building Microservices: Designing Fine-Grained Systems do Sam Newman

Microservices

Microsserviços são serviços em que a release ocorre independentemente e são modelados a partir do domínio do negócio.

Nesse exemplo terei dois microsserviços que se comunicará de forma assíncrona para fazer um compartilhamento de dados. O microsserviço de pedidos irá compartilhar dados de um pedido feito e eventualmente o microsserviço de entrega irá receber e cadastrar em seu banco de dados.

Asynchronous

Como Message Broker utiliza Topics

O Message Broker vai armazenar as mensagens em uma estrutura de dados chamada Topic para então o Consumer ou Subscriber ler a mensagem. Especialmente no Kafka, as mensagens não são deletadas assim que lidas, elas permanecem armazenadas por X tempo. O padrão é de 1 semana, esse aspecto é chamado no Kafka de Message Retention.

Um Topic pode ser visto como uma Queue de estrutura de dados com logs armazenados. Topics podem ser nomeados e podem ter 0, 1 ou muitos Producer e Consumer.

Producer e Consumer

Ou também o Bounded-Buffer Problem é um problema descrito por Dijkstra. Esse é um problema que foi resolvido com uma Queue em que é descrito que há:

  • 2 processos que são chamados de Producer e Consumer
  • Producer é um processo cíclico que a cada ciclo produz uma porção de informação
  • Consumer é um processo cíclico que a cada ciclo consome a próxima porção de informação produzida pelo Producer

Um Message Broker utilizada dessa arquitetura como solução. No entendo, é utilizado de forma distribuída com TCP/IP para permitir essa comunicação a nível de Virtual Machines (VM).

Essa não é a única arquitetura de comunicação de um Message Broker, também há Pub/Sub.

No código

Para iniciar o Kafka utilizei Docker e Docker-compose para um único servidor Kafka para esse exemplo.

kafka:
container_name: ordering-queue
image: 'bitnami/kafka:latest'
hostname: suetham-workplace
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
ports:
- 9092:9092

e com o comando para iniciar o container de um docker-compose.yml

Terminal window
docker compose up -d kafka

Finalmente o comando para criar o Topic “ordering” para o Kafka que está no container recém-criado “ordering-queue”

Terminal window
docker exec ordering-queue kafka-topics.sh --bootstrap-server localhost:9092 --topic ordering --create --partitions 3 --replication-factor 1

No código abaixo está um Consumer utilizando o framework Java Spring Boot (qualquer linguagem vai utilizar os mesmos conceitos!) que irá executar uma classe Service a partir do conteúdo em JSON recebido. Na annotation KafkaListener está definido o nome do Topic, “ordering” e um groupId para determinar instâncias de aplicação em um determinado grupo, assim permitindo que haja um Load Balancer para os Consumers e dividir a carga de mensagens entre N instâncias.

package com.deliveryapirest.consumer;
import com.deliveryapirest.data.Order;
import com.deliveryapirest.data.OrderStatus;
import com.deliveryapirest.services.RegisterOrderToShipService;
import com.deliveryapirest.typeAdapters.GsonOptionalAdapter;
import com.deliveryapirest.typeAdapters.GsonZonedDateTimeAdapter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
record OrderToConsume(
UUID id,
UUID productId,
int status,
ZonedDateTime createdAt,
Optional<ZonedDateTime> updatedAt,
Optional<ZonedDateTime> canceledAt) {}
@Component
public class OrderingConsumer {
RegisterOrderToShipService registerOrderToShipService;
public OrderingConsumer(RegisterOrderToShipService registerOrderToShipService) {
this.registerOrderToShipService = registerOrderToShipService;
}
@KafkaListener(topics = "ordering", groupId = "orderingGroup")
public void checkOrder(String content) {
OrderToConsume orderToConsume = receiveAndSerializeContent(content);
var order = convertOrderToConsumeToOrderObject(orderToConsume);
this.registerOrderToShipService.register(order);
}
private OrderToConsume receiveAndSerializeContent(String content) {
Gson gson =
new GsonBuilder()
.registerTypeAdapter(ZonedDateTime.class, new GsonZonedDateTimeAdapter())
.registerTypeAdapterFactory(GsonOptionalAdapter.FACTORY)
.create();
var contentInJson = gson.fromJson(content, OrderToConsume.class);
return contentInJson;
}
private Order convertOrderToConsumeToOrderObject(OrderToConsume orderToConsume) {
OrderStatus orderStatusInEnum = OrderStatus.fromInt(orderToConsume.status());
var order =
new Order(
orderToConsume.id(),
orderToConsume.productId(),
1,
orderStatusInEnum,
orderToConsume.createdAt(),
orderToConsume.updatedAt(),
orderToConsume.canceledAt());
return order;
}
}

O Producer o código é mais simples, na classe abaixo inicio uma nova configuração para o Kafka com o ClientId para debugging e BootstrapServers para definir os servidores do Kafka através de URL. Após isso é transformado em JSON o conteúdo da mensagem e enviado para o Message Broker.

namespace OrderingApi.Producers;
using System.Net;
using System.Text.Json;
using Confluent.Kafka;
using OrderingApi.Config;
public class OrderingKafkaProducer : OrderingProducer
{
public async Task<bool> SendOrderThroughMessageQueue(string topic, OrderToProduce order)
{
var config = new ProducerConfig
{
BootstrapServers = Env.KAFKA_URL,
ClientId = Dns.GetHostName()
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
var orderInJson = JsonSerializer.Serialize<OrderToProduce>(order);
var result = await producer.ProduceAsync(
topic,
new Message<Null, string> { Value = orderInJson }
);
return await Task.FromResult(true);
}
}
}

Conclusão

Me surpreendi como Message Broker é um conceito de tecnologia que herda de conceitos muito básicos e antigos como File System, Queue, Pub/Sub, Decoupling e Interprocess Communication. Esse foi um exemplo de um dos casos de uso de um Message Broker, mas com isso deu para entender o funcionamento básico e seu caso de uso com Microservices.

O projeto em Spring Boot e ASP.NET MVC pode ser encontrado no meu Github

Fiquem em paz

Logo do GitHub Made in Astro