Zookeeper & Kafka with Docker and Springboot

Kobe
7 min readNov 13, 2022

--

In this article, We will show you how to setup and run the Apache Kafka at local to speed up your development.

Zookeeper & Kafka Communication

Step 1: Setup Docker

docker-compose.yml configuration

version: '3'

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
volumes:
- ./pub-technology/zookeeper/data:/data
- ./pub-technology/zookeeper/datalog:/datalog

kafka-server-1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-server-1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./pub-technology/kafka1/data:/var/lib/kafka/data
kafka-server-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-server-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./pub-technology/kafka2/data:/var/lib/kafka/data
manager:
image: sheepkiller/kafka-manager
ports:
- 9000:9000
environment:
ZK_HOSTS: zookeeper:2181
depends_on:
- zookeeper

From the configuration file, we can see that we have a Zookeeper server used to monitor our Kafka. In this setup we will start a Zookeeper server and 2 Kafka nodes and start a Kafka manager support for our demonstrate

  • so in docker-compose.yml file we have 4 services, namely zookeeper and kafka-server-1, kafka-server-2, manager.
  • Make the zookepper server always starts before 2 Kafka servers, Kafka manager triggered start or stop. So in the config of the we have a setup depends_on to this dependency.
  • Zookeeper server is listening on the port 2181 to manage our Kafka servers. We defined within the same container setup and for any client running on the host will be run on the port 22181 so in the config of the zookeeper, we will expose on the port 22181:2181
  • With 2 nodes of Kafka servers we will expose the host application with port 29092 & 39092. However our Kafka is actually advertised on the port 9092 configured in the environment KAFKA_ADVERTISED_LISTENERS
  • service names and KAFKA_BROKER_ID are unique across the services.
  • The Kafka manager run on port 9000

Step 2: Start application

Start our containers using the docker-compose commanddocker-compose up -d

docker-compose up -d

Creating network "hello-kafka-server_default" with the default driver
Creating hello-kafka-server_zookeeper_1 ... done
Creating hello-kafka-server_kafka-server-1_1 ... done
Creating hello-kafka-server_kafka-server-2_1 ... done
Creating hello-kafka-server_manager_1... done

Make sure all services were run successfully and verify all services are listening on the exactly port in the configuration.

Use netcat command to make a connection to a port

  • Zookeeper ( port 22181 )
  • Kafka Node #1 ( port 29092 )
  • Kafka Node #2 ( port 39092 )
  • Kafka Manager ( port 9000 )
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 22181
Connection to localhost port 22181 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 29092
Connection to localhost port 29092 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 39092
Connection to localhost port 39092 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 9000
Connection to localhost port 9000 [tcp/cslistener] succeeded!

Use docker compose command to grep the logs of our services make sure the service is active and don’t have any exception when it started.

docker-compose logs zookeeper | grep -i started

zookeeper_1 | [2022-11-12 10:24:47,459] INFO Started o.e.j.s.ServletContextHandler@7eecb5b8{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
zookeeper_1 | [2022-11-12 10:24:47,582] INFO Started ServerConnector@5b247367{HTTP/1.1, (http/1.1)}{0.0.0.0:8080} (org.eclipse.jetty.server.AbstractConnector)
zookeeper_1 | [2022-11-12 10:24:47,583] INFO Started @5327ms (org.eclipse.jetty.server.Server)
zookeeper_1 | [2022-11-12 10:24:47,586] INFO Started AdminServer on address 0.0.0.0, port 8080 and command URL /commands (org.apache.zookeeper.server.admin.JettyAdminServer)
zookeeper_1 | [2022-11-12 10:24:47,785] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
docker-compose logs kafka-server-1 | grep -i started

kafka-server-1_1 | [2022-11-12 10:24:56,806] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

Step 3: CMAK is a tool for managing Apache Kafka clusters

Access the host http://localhost:9000/ to open the Kafka Cluster Manager and you can easy to create a new cluster

From the last image you can see your cluster have 2 Kafka servers

Sample create a topic with 2 partitions and 2 replicates

Step 4 : Create a Springboot app with spring-kafka

Please checkout this repo https://github.com/pub-technology/zookeeper-kafka-docker-springboot/tree/main/demo for more detail

4.1 Add spring-kafka dependency

  • add the spring-kafka dependency to our pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.2</version>
</dependency>

4.2 Create Kafka topic programmatically

Now we can easy to create topics programmatically with AdminClient in Spring-Kafka. In this demo I’ll create a new topic product with KafkaAdmin

package com.example.kafka;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaTopicConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}

@Bean
public NewTopic productTopic() {
return new NewTopic("product", 1, (short) 1);
}

}

After start application the configuration will be run automatically and if the top don’t exist the Kafka Amin will create a new one

In the Kafka manager we can see the our topic created

4.3 Create producer & send message to topic

So now we need to config the workflow Kafka send & listener message

We need a ProducerFactory support generate producer instance and we will have a KafkaTemplate use the producer instant for sending message to the topic.

package com.example.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, Serializable> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps, new StringSerializer(), new JsonSerializer<>());
}

@Bean
KafkaTemplate<String, Serializable> jsonKafkaTemplate(ProducerFactory<String, Serializable> jsonProducerFactory) {
return new KafkaTemplate<>(jsonProducerFactory);
}
}

In our ProducerConfig will help our app have ability send message type JSON instead of String message.

We will create a ProductProducer support for send message to topic

package com.example.kafka.producers;

import com.example.requests.Product;
import com.example.requests.ProductMessage;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.io.Serializable;

@Slf4j
@NoArgsConstructor
@Component
public class ProductProducer {
final String productTopic = "product";

private KafkaTemplate<String, Serializable> kafkaTemplate;

@Autowired
public ProductProducer(KafkaTemplate<String, Serializable> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void send(ProductMessage message) {
ListenableFuture<SendResult<String, Serializable>> future = kafkaTemplate.send(productTopic, message);

future.addCallback(new ListenableFutureCallback<SendResult<String, Serializable>>() {
@Override
public void onFailure(Throwable ex) {
log.error("Unable to send message = {} dut to: {}", message.toString(), ex.getMessage());
}

@Override
public void onSuccess(SendResult<String, Serializable> result) {
log.info("Message sent successfully with offset = {}", result.getRecordMetadata().offset());
}
});
}

}

In our service we will use ProductProvider to send message to topic

package com.example.services;

import com.example.kafka.producers.ProductProducer;
import com.example.requests.Product;
import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ProductService {
private ProductProducer productProducer;

@Autowired
public ProductService(ProductProducer productProducer) {
this.productProducer = productProducer;
}

public void sendMessage(ProductMessage message) {
log.info("[ProductService] send product to topic");
productProducer.send(message);
}

}

ProductProducer instance are thread safe it will give higher performance don’t worry about the problem we send message continuously.

4.4 Create consumer & consuming messages from topic

First we need to create ConsumerConfig configure a ConsumerFactory and ListenerFactory

package com.example.kafka;

import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, ProductMessage> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "consuming");

DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> classMap = new HashMap<>();
typeMapper.setIdClassMapping(classMap);
typeMapper.addTrustedPackages("*");

JsonDeserializer<ProductMessage> jsonDeserializer = new JsonDeserializer<>(ProductMessage.class);
jsonDeserializer.setTypeMapper(typeMapper);
jsonDeserializer.setUseTypeMapperForKey(true);

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), jsonDeserializer);
}

public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consuming");
return props;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ProductMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProductMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
log.info("Configure concurrent consumer Kafka");
return factory;
}
}

Next, we will create a ProductLisnter support listening the message from topic

package com.example.kafka.listeners;

import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ProductListener {
@KafkaListener(topics = "product", containerFactory = "kafkaListenerContainerFactory")
public void newProductListener(ProductMessage product) {
log.info("Get request from product topic " + product.toString());
}
}

Demo

  • Checkout the project and cd to demo application
  • Start the Spring boot application
  • trigger api add product `curl/product-add.sh` from root demo folder

--

--

Kobe

I’m working at KMS-Technology company. I love code (▀̿Ĺ̯▀̿ ̿) — Full Stack Software Engineer