In this article, We will show you how to setup and run the Apache Kafka at local to speed up your development.
Step 1: Setup Docker
docker-compose.yml configuration
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
volumes:
- ./pub-technology/zookeeper/data:/data
- ./pub-technology/zookeeper/datalog:/datalog
kafka-server-1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-server-1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./pub-technology/kafka1/data:/var/lib/kafka/data
kafka-server-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-server-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./pub-technology/kafka2/data:/var/lib/kafka/data
manager:
image: sheepkiller/kafka-manager
ports:
- 9000:9000
environment:
ZK_HOSTS: zookeeper:2181
depends_on:
- zookeeper
From the configuration file, we can see that we have a Zookeeper server used to monitor our Kafka. In this setup we will start a Zookeeper server and 2 Kafka nodes and start a Kafka manager support for our demonstrate
- so in docker-compose.yml file we have 4 services, namely zookeeper and kafka-server-1, kafka-server-2, manager.
- Make the zookepper server always starts before 2 Kafka servers, Kafka manager triggered start or stop. So in the config of the we have a setup
depends_on
to this dependency. - Zookeeper server is listening on the port 2181 to manage our Kafka servers. We defined within the same container setup and for any client running on the host will be run on the port 22181 so in the config of the zookeeper, we will expose on the port 22181:2181
- With 2 nodes of Kafka servers we will expose the host application with port 29092 & 39092. However our Kafka is actually advertised on the port 9092 configured in the environment KAFKA_ADVERTISED_LISTENERS
- service names and KAFKA_BROKER_ID are unique across the services.
- The Kafka manager run on port 9000
Step 2: Start application
Start our containers using the docker-compose commanddocker-compose up -d
docker-compose up -d
Creating network "hello-kafka-server_default" with the default driver
Creating hello-kafka-server_zookeeper_1 ... done
Creating hello-kafka-server_kafka-server-1_1 ... done
Creating hello-kafka-server_kafka-server-2_1 ... done
Creating hello-kafka-server_manager_1... done
Make sure all services were run successfully and verify all services are listening on the exactly port in the configuration.
Use netcat
command to make a connection to a port
- Zookeeper ( port 22181 )
- Kafka Node #1 ( port 29092 )
- Kafka Node #2 ( port 39092 )
- Kafka Manager ( port 9000 )
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 22181
Connection to localhost port 22181 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 29092
Connection to localhost port 29092 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 39092
Connection to localhost port 39092 [tcp/*] succeeded!
(base) haithai@FVFV2H hello-kafka-server % nc -z localhost 9000
Connection to localhost port 9000 [tcp/cslistener] succeeded!
Use docker compose command to grep the logs of our services make sure the service is active and don’t have any exception when it started.
docker-compose logs zookeeper | grep -i started
zookeeper_1 | [2022-11-12 10:24:47,459] INFO Started o.e.j.s.ServletContextHandler@7eecb5b8{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
zookeeper_1 | [2022-11-12 10:24:47,582] INFO Started ServerConnector@5b247367{HTTP/1.1, (http/1.1)}{0.0.0.0:8080} (org.eclipse.jetty.server.AbstractConnector)
zookeeper_1 | [2022-11-12 10:24:47,583] INFO Started @5327ms (org.eclipse.jetty.server.Server)
zookeeper_1 | [2022-11-12 10:24:47,586] INFO Started AdminServer on address 0.0.0.0, port 8080 and command URL /commands (org.apache.zookeeper.server.admin.JettyAdminServer)
zookeeper_1 | [2022-11-12 10:24:47,785] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
docker-compose logs kafka-server-1 | grep -i started
kafka-server-1_1 | [2022-11-12 10:24:56,806] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
Step 3: CMAK is a tool for managing Apache Kafka clusters
Access the host http://localhost:9000/ to open the Kafka Cluster Manager and you can easy to create a new cluster
From the last image you can see your cluster have 2 Kafka servers
Sample create a topic with 2 partitions and 2 replicates
Step 4 : Create a Springboot app with spring-kafka
Please checkout this repo https://github.com/pub-technology/zookeeper-kafka-docker-springboot/tree/main/demo for more detail
4.1 Add spring-kafka dependency
- add the spring-kafka dependency to our pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.2</version>
</dependency>
4.2 Create Kafka topic programmatically
Now we can easy to create topics programmatically with AdminClient in Spring-Kafka. In this demo I’ll create a new topic product
with KafkaAdmin
package com.example.kafka;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic productTopic() {
return new NewTopic("product", 1, (short) 1);
}
}
After start application the configuration will be run automatically and if the top don’t exist the Kafka Amin will create a new one
In the Kafka manager we can see the our topic created
4.3 Create producer & send message to topic
So now we need to config the workflow Kafka send & listener message
We need a ProducerFactory support generate producer instance and we will have a KafkaTemplate use the producer instant for sending message to the topic.
package com.example.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Serializable> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps, new StringSerializer(), new JsonSerializer<>());
}
@Bean
KafkaTemplate<String, Serializable> jsonKafkaTemplate(ProducerFactory<String, Serializable> jsonProducerFactory) {
return new KafkaTemplate<>(jsonProducerFactory);
}
}
In our ProducerConfig will help our app have ability send message type JSON instead of String message.
We will create a ProductProducer support for send message to topic
package com.example.kafka.producers;
import com.example.requests.Product;
import com.example.requests.ProductMessage;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.io.Serializable;
@Slf4j
@NoArgsConstructor
@Component
public class ProductProducer {
final String productTopic = "product";
private KafkaTemplate<String, Serializable> kafkaTemplate;
@Autowired
public ProductProducer(KafkaTemplate<String, Serializable> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(ProductMessage message) {
ListenableFuture<SendResult<String, Serializable>> future = kafkaTemplate.send(productTopic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Serializable>>() {
@Override
public void onFailure(Throwable ex) {
log.error("Unable to send message = {} dut to: {}", message.toString(), ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Serializable> result) {
log.info("Message sent successfully with offset = {}", result.getRecordMetadata().offset());
}
});
}
}
In our service we will use ProductProvider
to send message to topic
package com.example.services;
import com.example.kafka.producers.ProductProducer;
import com.example.requests.Product;
import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ProductService {
private ProductProducer productProducer;
@Autowired
public ProductService(ProductProducer productProducer) {
this.productProducer = productProducer;
}
public void sendMessage(ProductMessage message) {
log.info("[ProductService] send product to topic");
productProducer.send(message);
}
}
ProductProducer instance are thread safe it will give higher performance don’t worry about the problem we send message continuously.
4.4 Create consumer & consuming messages from topic
First we need to create ConsumerConfig configure a ConsumerFactory and ListenerFactory
package com.example.kafka;
import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, ProductMessage> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "consuming");
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> classMap = new HashMap<>();
typeMapper.setIdClassMapping(classMap);
typeMapper.addTrustedPackages("*");
JsonDeserializer<ProductMessage> jsonDeserializer = new JsonDeserializer<>(ProductMessage.class);
jsonDeserializer.setTypeMapper(typeMapper);
jsonDeserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), jsonDeserializer);
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consuming");
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ProductMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProductMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
log.info("Configure concurrent consumer Kafka");
return factory;
}
}
Next, we will create a ProductLisnter
support listening the message from topic
package com.example.kafka.listeners;
import com.example.requests.ProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ProductListener {
@KafkaListener(topics = "product", containerFactory = "kafkaListenerContainerFactory")
public void newProductListener(ProductMessage product) {
log.info("Get request from product topic " + product.toString());
}
}
Demo
- Checkout the project and cd to demo application
- Start the Spring boot application
- trigger api add product `curl/product-add.sh` from root demo folder