Kafka Consumer-Producer App
Kafka Consumer-Producer App Example with code
Apache Kafka is a distributed event store and stream-processing platform. It is an open-source system developed by the Apache Software Foundation written in Java and Scala.
Here in this page I am going to show a simple maven project where I created how to make a producer and Consumer App for kafka messaging system.
This is the very simple demo of Kafka Consumer And Producer App.
Git Hub Link :- https://github.com/Bibhuti5/KafkaConsumer-Producer
Pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.cg.school</groupId><artifactId>Kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>Kafka</name><description>Trainning project on school demo</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.2.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency></dependencies><build><pluginManagement><!-- lock down plugins versions to avoid using Mavendefaults (may be moved to parent pom) --><plugins><!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin><!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --><plugin><artifactId>maven-site-plugin</artifactId><version>3.7.1</version></plugin><plugin><artifactId>maven-project-info-reports-plugin</artifactId><version>3.0.0</version></plugin></plugins></pluginManagement></build></project>
Consumer
package com.cg.consumer;import java.util.Properties;
import java.util.ArrayList;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class ConsumerApp {public static void main(String [] args) {//Step1 set the mandatory properties.
Properties props= new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id","test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//Create Kafka consumer.
KafkaConsumer<String,String> consumer= new KafkaConsumer<String, String>(props);
ArrayList<String> topicList =new ArrayList();
topicList.add("firstTopic"); //topics to subscribe.
// Subscribe to list of topics
consumer.subscribe(topicList);
try{
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String,String>record : records) {
System.out.println("****"+record.toString());
}
}
}
catch(Exception ex){
ex.printStackTrace();
}
finally {
consumer.close();
}
}
}
Producer
package com.cg.producers;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;public class KafkaApplication {public static void main(String[] args) {
//Step1 set the mandatory properties.
Properties props= new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//Create Kafka Producer.
KafkaProducer<String,String> producer= new KafkaProducer<String, String>(props);try{
for (int i=210;i<=220;i++){
ProducerRecord<String,String> record= new ProducerRecord<String ,String >("firstTopic","test value"+i);
producer.send(record);
System.out.println("Producer record number send"+i);}
}
catch(Exception ex){
ex.printStackTrace();
}
finally {
producer.close();
}
}}
Comments