DEV Community

Jaderson Brandão
Jaderson Brandão

Posted on

Integração entre APIs Spring Boot com Apache Kafka

Neste artigo vamos entender alguns conceitos básicos do Apache Kafka, aprender a configurá-lo de forma básica no Spring Boot e aplicar esses conhecimentos em um exemplo prático.

O que é o Apache Kafka?
Apache Kafka é uma plataforma de stremming distribuída para construir pipelines de dados em tempo real. Ele funciona como um sistema de mensageria baseado em logs e é ideal para casos onde o volume de mensagens é alto e é necessário garantir a entrega ordenada e escalável.

Conceitos pricipais:

  • Producer: quem publica mensagens no Kafka.

  • Consumer: quem consome mensagens de um tópico.

  • Topic: canal onde as mensagens são agrupadas.

  • Partition: subdivisão do tópico para paralelismo.

  • Offset: identifica a posição da mensagem na partição.

  • Broker: servidor Kafka que armazena os dados.

  • Zookeeper: utilizado para gerenciamento do cluster(a partir do Kafka 2.8 pode ser opcional).

Criando o Projeto Spring Boot
Crie dois projetos Spring Boot:

  • Api Paciente: que será o Producer
  • Api Agendamento: que será o Consumer

Inclua a dependência no pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Exemplo Prático: Agendamento de Consultas
Cenário:

  • A API do Paciente envia uma solicitação de agendamento para um tópico Kafka.
  • A API de Agendamento consome essa mensagem e processa o agendamento.

🗂️ Estrutura do Projeto (API Paciente - Producer)

src
└── main
    └── java
        └── br
            └── com
                └── projeto
                    ├── config
                    │   └── KafkaProducerConfig.java
                    ├── consulta
                    │   ├── ConsultaDTO.java
                    │   ├── ConsultaProducer.java
                    │   └── ConsultaResource.java

Enter fullscreen mode Exit fullscreen mode
  • KafkaProducerConfig: configura o KafkaTemplate para serialização de mensagens.
  • ConsultaDTO: estrutura da mensagem que será enviada ao Kafka.
  • ConsultaProducer: serviço responsável por publicar no tópico Kafka.
  • ConsultaResource: endpoint REST que simula o envio da mensagem.

Configuração do application.yml:

spring:
  application:
    name: api-paciente-producer
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Enter fullscreen mode Exit fullscreen mode

🧩 key-serializer: org.apache.kafka.common.serialization.StringSerializer

  • Essa configuração define que a chave da mensagem Kafka será serializada como uma string.

🧩 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

  • Define que o conteúdo da mensagem (o valor) será convertido em JSON antes de ser enviado.
  • Útil quando você quer identificar ou particionar mensagens com uma chave textual, como "idPaciente123".

💡 Exemplo prático
Se você enviar isso:

kafkaTemplate.send("agendamento-consultas", new ConsultaDTO(1L, LocalDateTime.now(), "Cardiologia"));
Enter fullscreen mode Exit fullscreen mode

O Kafka enviará uma mensagem parecida com:

{
  "idPaciente": 1,
  "dataHora": "2025-04-21T15:00:00",
  "especialidade": "Cardiologia"
}

Enter fullscreen mode Exit fullscreen mode

Estrutura da mensagem:

public record ConsultaDTO(Long idPaciente, LocalDateTime dataHora, String especialidade) {}
Enter fullscreen mode Exit fullscreen mode

Condiguração do Producer:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, ConsultaDTO> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, ConsultaDTO> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
Enter fullscreen mode Exit fullscreen mode
  • BOOTSTRAP_SERVERS_CONFIG: endereço do broker Kafka.
  • KEY_SERIALIZER_CLASS_CONFIG: indica que a chave da mensagem será serializada como String.
  • VALUE_SERIALIZER_CLASS_CONFIG: define que o valor será convertido em JSON automaticamente.

OBS: Essa configuração facilita a leitura e integração entre sistemas. Ela permite que o Kafka envie objetos Java serializados como JSON.

Criando o service que vai enviar as mensagens:

@Service
public class ConsultaProducer {

    private final KafkaTemplate<String, ConsultaDTO> kafkaTemplate;

    public ConsultaProducer(KafkaTemplate<String, ConsultaDTO> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void enviarAgendamentoConsulta(ConsultaDTO consulta) {
        this.kafkaTemplate.send("agendamento-consultas", consulta);
    }
}
Enter fullscreen mode Exit fullscreen mode
  • KafkaTemplate: encapsula um produtor e fornece métodos para enviar dados aos tópicos Kafka.

** Criando o Endpoint para agendamento:**

@RestController
@RequestMapping("/api/v1/consultas")
public class ConsultaResource {

    private final ConsultaProducer producer;
    public ConsultaResource(ConsultaProducer producer) {
        this.producer = producer;
    }
    @PostMapping
    public void agendarConsulta(@RequestBody ConsultaDTO dto) {
        this.producer.enviarAgendamentoConsulta(dto);
    }
}
Enter fullscreen mode Exit fullscreen mode

🗂️ Estrutura do Projeto (API Agendamento- Consumer)

src
└── main
    └── java
        └── br
            └── com
                └── projeto
                    ├── config
                    │   └── KafkaConsumerConfig.java
                    ├── consulta
                    │   ├── ConsultaDTO.java
                    │   ├── ConsultaConsumer.java

Enter fullscreen mode Exit fullscreen mode

Configuração do arquivo application.yml:

spring:
  application:
    name: api-agendamento-consumer
  kafka:
    consumer:
      group-id: grupo-agendamento
      key-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: '*'
server:
  port: 8081
Enter fullscreen mode Exit fullscreen mode

Vamos criar nossa classe de configuração.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, ConsultaDTO> consumerFactory() {
        JsonDeserializer<ConsultaDTO> deserializer = new JsonDeserializer<>(ConsultaDTO.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "grupo-agendamento");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ConsultaDTO> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ConsultaDTO> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
Enter fullscreen mode Exit fullscreen mode
  • No consumidor precisamos usar o @EnableKafka.
  • Como estou ulizando record, tiver que incluir o bloco JsonDeserializer no Bean ConsumerFactory.

props.put(ConsumerConfig.GROUP_ID_CONFIG, "grupo-agendamento");
Essa propriedade define o ID do grupo de consumidores ao qual esse consumidor Kafka pertence.
No kafka quando múltiplas instâncias de cosumidores estão no mesmo grupo( mesmo group.id), o Kafka garante que cada mensagem será lida por apenas um menbro do grupo, permitindo balanceamento de carga escalabilidade.
Se você tiver várias instâncias da API de agendamento rodando, cada uma receberá apenas parte das mensagens — evitando duplicidade no processamento.

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, ConsultaDTO> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ConsultaDTO> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
Enter fullscreen mode Exit fullscreen mode

Esse bean é responsável por criar o container que gerencia os listeners Kafka na aplicação.
Ele oferece suporte a concorrência, permitindo processar múltiplas mensagens em paralelo.
Sem esse bean, o spring não saberia como instanciar corretamente os métodos anotados com @KafkaListener

Vamos criar nossa classe service para processar o agendamento:

@Service
public class ConsultaConsumer {

    @KafkaListener(topics = "agendamento-consultas", groupId = "grupo-agendamento")
    public void processaAgendamentoConsulta(ConsultaDTO consulta) {
        System.out.println("Consulta recebida: " + consulta);
    }

}

Enter fullscreen mode Exit fullscreen mode

Aqui termimos o desenvolvimento básico das nossas APIs.

Para fins de desenvolvimento local(espero que tenha o Docker instalado), podemos rodar o Kafka com Docker utilizando o seguinte arquivo docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode
  • No terminal dentro da pasta onde está o docker-compose rodar o resguinte comando: docker-compose up -d
  • Após rodar os comando podemos subir nossas APIs(em portas diferentes) e testar o envio das mensagens.

  • Teste via postman:

Image description

  • Conclusão

Com Apache Kafka, conseguimos criar uma arquitura de comunicação assíncrona entre APIs de forma escalável e resiliente. Nas próximas postagens, abordaremos como realizar essa mesma comunicaçao utilizando RabbittMQ e Apche Camel.

O exemplo completo está disponível no Github: Git

Top comments (0)