Zookeeper & Kafka with Docker and Springboot

7 min readNov 13, 2022


In this article, We will show you how to setup and run the Apache Kafka at local to speed up your development.

Zookeeper & Kafka Communication

Step 1: Setup Docker

docker-compose.yml configuration

version: '3'

image: confluentinc/cp-zookeeper:latest
- 22181:2181
- ./pub-technology/zookeeper/data:/data
- ./pub-technology/zookeeper/datalog:/datalog

image: confluentinc/cp-kafka:latest
- zookeeper
- 29092:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-server-1:9092,PLAINTEXT_HOST://localhost:29092
- ./pub-technology/kafka1/data:/var/lib/kafka/data
image: confluentinc/cp-kafka:latest
- zookeeper
- 39092:39092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-server-2:9092,PLAINTEXT_HOST://localhost:39092
- ./pub-technology/kafka2/data:/var/lib/kafka/data
image: sheepkiller/kafka-manager
- 9000:9000
ZK_HOSTS: zookeeper:2181
- zookeeper

From the configuration file, we can see that we have a Zookeeper server used to monitor our Kafka. In this setup we will start a Zookeeper server and 2 Kafka nodes and start a Kafka manager support for our demonstrate

  • so in docker-compose.yml file we have 4 services, namely zookeeper and kafka-server-1, kafka-server-2, manager.
  • Make the zookepper server always starts before 2 Kafka servers, Kafka manager triggered start or stop. So in the config of the we have a setup depends_on to this dependency.
  • Zookeeper server is listening on the port 2181 to manage our Kafka servers. We defined within the same container setup and for any client running on the host will be run on the port 22181 so in the config of the zookeeper, we will expose on the port 22181:2181
  • With 2 nodes of Kafka servers we will expose the host application with port 29092 & 39092. However our Kafka is actually advertised on the port 9092 configured in the environment KAFKA_ADVERTISED_LISTENERS
  • service names and KAFKA_BROKER_ID are unique across the services.
  • The Kafka manager run on port 9000

Step 2: Start application

Start our containers using the docker-compose commanddocker-compose up -d

docker-compose up -d

Creating network "hello-kafka-server_default" with the default driver
Creating hello-kafka-server_zookeeper_1 ... done
Creating hello-kafka-server_kafka-server-1_1 ... done
Creating hello-kafka-server_kafka-server-2_1 ... done
Creating hello-kafka-server_manager_1... done

Make sure all services were run successfully and verify all services are listening on the exactly port in the configuration.

Use netcat command to make a connection to a port

  • Zookeeper ( port 22181 )
  • Kafka Node #1 ( port 29092 )
  • Kafka Node #2 ( port 39092 )
  • Kafka Manager ( port 9000 )
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 22181
Connection to localhost port 22181 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 29092
Connection to localhost port 29092 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 39092
Connection to localhost port 39092 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 9000
Connection to localhost port 9000 [tcp/cslistener] succeeded!

Use docker compose command to grep the logs of our services make sure the service is active and don’t have any exception when it started.

docker-compose logs zookeeper | grep -i started

zookeeper_1 | [2022-11-12 10:24:47,459] INFO Started o.e.j.s.ServletContextHandler@7eecb5b8{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
zookeeper_1 | [2022-11-12 10:24:47,582] INFO Started ServerConnector@5b247367{HTTP/1.1, (http/1.1)}{} (org.eclipse.jetty.server.AbstractConnector)
zookeeper_1 | [2022-11-12 10:24:47,583] INFO Started @5327ms (org.eclipse.jetty.server.Server)
zookeeper_1 | [2022-11-12 10:24:47,586] INFO Started AdminServer on address, port 8080 and command URL /commands (org.apache.zookeeper.server.admin.JettyAdminServer)
zookeeper_1 | [2022-11-12 10:24:47,785] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
docker-compose logs kafka-server-1 | grep -i started

kafka-server-1_1 | [2022-11-12 10:24:56,806] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

Step 3: CMAK is a tool for managing Apache Kafka clusters

Access the host http://localhost:9000/ to open the Kafka Cluster Manager and you can easy to create a new cluster

From the last image you can see your cluster have 2 Kafka servers

Sample create a topic with 2 partitions and 2 replicates

Step 4 : Create a Springboot app with spring-kafka

Please checkout this repo https://github.com/pub-technology/zookeeper-kafka-docker-springboot/tree/main/demo for more detail

4.1 Add spring-kafka dependency

  • add the spring-kafka dependency to our pom.xml

4.2 Create Kafka topic programmatically

Now we can easy to create topics programmatically with AdminClient in Spring-Kafka. In this demo I’ll create a new topic product with KafkaAdmin

package com.example.kafka;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

public class KafkaTopicConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);

public NewTopic productTopic() {
return new NewTopic("product", 1, (short) 1);


After start application the configuration will be run automatically and if the top don’t exist the Kafka Amin will create a new one

In the Kafka manager we can see the our topic created

4.3 Create producer & send message to topic

So now we need to config the workflow Kafka send & listener message

We need a ProducerFactory support generate producer instance and we will have a KafkaTemplate use the producer instant for sending message to the topic.

package com.example.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

public ProducerFactory<String, Serializable> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps, new StringSerializer(), new JsonSerializer<>());

KafkaTemplate<String, Serializable> jsonKafkaTemplate(ProducerFactory<String, Serializable> jsonProducerFactory) {
return new KafkaTemplate<>(jsonProducerFactory);

In our ProducerConfig will help our app have ability send message type JSON instead of String message.

We will create a ProductProducer support for send message to topic

package com.example.kafka.producers;

import com.example.requests.Product;
import com.example.requests.ProductMessage;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.io.Serializable;

public class ProductProducer {
final String productTopic = "product";

private KafkaTemplate<String, Serializable> kafkaTemplate;

public ProductProducer(KafkaTemplate<String, Serializable> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;

public void send(ProductMessage message) {
ListenableFuture<SendResult<String, Serializable>> future = kafkaTemplate.send(productTopic, message);

future.addCallback(new ListenableFutureCallback<SendResult<String, Serializable>>() {
public void onFailure(Throwable ex) {
log.error("Unable to send message = {} dut to: {}", message.toString(), ex.getMessage());

public void onSuccess(SendResult<String, Serializable> result) {
log.info("Message sent successfully with offset = {}", result.getRecordMetadata().offset());


In our service we will use ProductProvider to send message to topic

package com.example.services;

import com.example.kafka.producers.ProductProducer;
import com.example.requests.Product;
import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

public class ProductService {
private ProductProducer productProducer;

public ProductService(ProductProducer productProducer) {
this.productProducer = productProducer;

public void sendMessage(ProductMessage message) {
log.info("[ProductService] send product to topic");


ProductProducer instance are thread safe it will give higher performance don’t worry about the problem we send message continuously.

4.4 Create consumer & consuming messages from topic

First we need to create ConsumerConfig configure a ConsumerFactory and ListenerFactory

package com.example.kafka;

import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, ProductMessage> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "consuming");

DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> classMap = new HashMap<>();

JsonDeserializer<ProductMessage> jsonDeserializer = new JsonDeserializer<>(ProductMessage.class);

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), jsonDeserializer);

public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consuming");
return props;

public ConcurrentKafkaListenerContainerFactory<String, ProductMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProductMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
log.info("Configure concurrent consumer Kafka");
return factory;

Next, we will create a ProductLisnter support listening the message from topic

package com.example.kafka.listeners;

import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

public class ProductListener {
@KafkaListener(topics = "product", containerFactory = "kafkaListenerContainerFactory")
public void newProductListener(ProductMessage product) {
log.info("Get request from product topic " + product.toString());


  • Checkout the project and cd to demo application
  • Start the Spring boot application
  • trigger api add product `curl/product-add.sh` from root demo folder




