Introduction to Apache Kafka
Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation. Kafka is used for building real-time data pipelines and streaming applications. It is horizontally scalable, fault-tolerant, and highly available.
Prerequisites
- JDK 17 or later
- Maven or Gradle
- IDE (IntelliJ IDEA, Eclipse, etc.)
- Apache Kafka server (You can run Kafka using Docker)
Step 1: Set Up Apache Kafka Server
You can run Kafka using Docker with the following commands:
docker network create kafka-net
docker run -d --net=kafka-net --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper
docker run -d --net=kafka-net --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka
This command will start Kafka and Zookeeper in Docker containers.
Step 2: Create the Projects
We'll create two Spring Boot projects: producer-service
and consumer-service
.
Step 3: Set Up producer-service
3.1 Create the Project
Use Spring Initializr to create a new project with the following dependencies:
- Spring Web
- Spring Boot Actuator
- Spring for Apache Kafka
3.2 Configure application.properties
Set up the application properties for producer-service
.
spring.application.name=producer-service
spring.kafka.bootstrap-servers=localhost:9092
Explanation:
spring.application.name=producer-service
: Names the application.spring.kafka.bootstrap-servers=localhost:9092
: Specifies the Kafka server address.
3.3 Configure Kafka Producer
Create a configuration class to define the Kafka producer settings.
package com.example.producerservice;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Explanation:
@Configuration
: Indicates that this class contains Spring configuration.@Bean
: Marks this method as a bean producer.ProducerFactory<String, String>
: Configures the Kafka producer factory with server address and serializers.KafkaTemplate<String, String>
: Creates a Kafka template for sending messages.
3.4 Create a Message Producer
Create a service to send messages to Kafka.
package com.example.producerservice;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
private static final String TOPIC = "exampleTopic";
private final KafkaTemplate<String, String> kafkaTemplate;
public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
Explanation:
@Service
: Marks this class as a service component.KafkaTemplate<String, String> kafkaTemplate
: Injects theKafkaTemplate
for sending messages to Kafka.sendMessage(String message)
: Sends a message to theexampleTopic
.
3.5 Create a Controller
Create a controller to handle HTTP requests and send messages to Kafka.
package com.example.producerservice;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
private final MessageProducer messageProducer;
public MessageController(MessageProducer messageProducer) {
this.messageProducer = messageProducer;
}
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
messageProducer.sendMessage(message);
return "Message sent: " + message;
}
}
Explanation:
@RestController
: Marks this class as a REST controller.@GetMapping("/send")
: Maps GET requests to/send
to this method.@RequestParam String message
: Extracts themessage
parameter from the request.messageProducer.sendMessage(message)
: Sends the message to Kafka.
Step 4: Set Up consumer-service
4.1 Create the Project
Use Spring Initializr to create a new project with the following dependencies:
- Spring Web
- Spring Boot Actuator
- Spring for Apache Kafka
4.2 Configure application.properties
Set up the application properties for consumer-service
.
spring.application.name=consumer-service
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group_id
Explanation:
spring.application.name=consumer-service
: Names the application.spring.kafka.bootstrap-servers=localhost:9092
: Specifies the Kafka server address.spring.kafka.consumer.group-id=group_id
: Specifies the Kafka consumer group ID.
4.3 Configure Kafka Consumer
Create a configuration class to define the Kafka consumer settings.
package com.example.consumerservice;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Explanation:
@EnableKafka
: Enables Kafka listener annotations.@Configuration
: Indicates that this class contains Spring configuration.@Bean
: Marks this method as a bean producer.ConsumerFactory<String, String>
: Configures the Kafka consumer factory with server address and deserializers.ConcurrentKafkaListenerContainerFactory<String, String>
: Creates a Kafka listener container factory.
4.4 Create a Message Listener
Create a service to listen to messages from Kafka.
package com.example.consumerservice;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class MessageListener {
@KafkaListener(topics = "exampleTopic", groupId = "group_id")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
Explanation:
@Service
: Marks this class as a service component.@KafkaListener(topics = "exampleTopic", groupId = "group_id")
: Annotates a method to listen to messages from theexampleTopic
.listen(String message)
: Processes the received message and prints it to the console.
Step 5: Run the Microservices
- Start Kafka and Zookeeper: Ensure Kafka and Zookeeper are running using the Docker commands mentioned above.
- Start
producer-service
: Run theProducerServiceApplication
class. - Start
consumer-service
: Run theConsumerServiceApplication
class.
Step 6: Test the Communication
-
Open your browser or use a tool like Postman to send a GET request to
producer-service
:- URL:
http://localhost:8081/send?message=Hello
- This will send the message "Hello" to the
exampleTopic
.
- URL:
-
Check the console logs of
consumer-service
to see the received message:- You should see Received message: Hello in the logs.
Conclusion
You have successfully set up two Spring Boot microservices that communicate asynchronously using Apache Kafka. This setup allows you to decouple the services and enable asynchronous processing, which can improve the scalability and resilience of your system. This example can be expanded to include more complex message handling, additional microservices, and advanced Kafka configurations.
Comments
Post a Comment