Kafka Producer and Consumer Java Json Examples

Currently, there are 2 ways to write and read from kafka, via producer and consumer or kafka stream. Data are write once to kafka via producer and consumer, while with stream, data are streamed to kafka in bytes and read by bytes. This article describe the use of producer and consumer API for data storage, while kafka stream is for video, audio streaming purpose.

1. Create the object for Json Serialization and Deserilization

package com.kafka.client;
import java.util.StringTokenizer;

public class Contact {

    private int contactId;
    private String firstName;
    private String lastName;

    public Contact(){

    }
    public Contact(int contactId, String firstName, String lastName) {
        this.contactId = contactId;
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public void parseString(String csvStr){
        StringTokenizer st = new StringTokenizer(csvStr,",");
        contactId = Integer.parseInt(st.nextToken());
        firstName = st.nextToken();
        lastName = st.nextToken();
    }


    public int getContactId() {
        return contactId;
    }

    public void setContactId(int contactId) {
        this.contactId = contactId;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "Contact{" +
                "contactId=" + contactId +
                ", firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                '}';
    }	
}

2. Write into Kafka using Producer

package com.kafka.client;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kafka.client.object.Contact;

import java.util.Properties;
import java.io.IOException;

public class Producer
{
    public static void produce(String brokers, String topicName) throws IOException
    {

        // Set properties used to configure the producer
        Properties properties = new Properties();
        // Set the brokers (bootstrap servers)
        properties.setProperty("bootstrap.servers", brokers);
        // Set how to serialize key/value pairs
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer","org.apache.kafka.connect.json.JsonSerializer");
        // specify the protocol for SSL Encryption This is needed for secure clusters
        //properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

        KafkaProducer producer = new KafkaProducer(properties);
        ObjectMapper objectMapper = new ObjectMapper();
        
        try {
            Contact contact = new Contact();
            contact.setContactId(1);
            contact.setFirstName("Bububombo");
            contact.setLastName("Tekateka");
            JsonNode jsonNode = objectMapper.valueToTree(contact);
            ProducerRecord rec = new ProducerRecord(topicName, jsonNode);
            producer.send(rec);
            
        } catch (Exception ex) {
        	ex.printStackTrace();
        } finally {
        	producer.close();
        }
    }
}

3. Read the data from Kafka Using Consumer with Manual Offset Commit

package com.kafka.client;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kafka.client.object.Contact;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class Consumer {
    public static int consume(String brokers, String groupId, String topicName) {
        // Create a consumer
        KafkaConsumer consumer;
        // Configure the consumer
        Properties properties = new Properties();
        // Point it to the brokers
        properties.setProperty("bootstrap.servers", brokers);
        // Set the consumer group (all consumers must belong to a group).
        properties.setProperty("group.id", groupId);
        // Set how to serialize key/value pairs
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.connect.json.JsonDeserializer");
       
        // When a group is first created, it has no offset stored to start reading from. This tells it to start
        // with the earliest record in the stream.
        properties.setProperty("auto.offset.reset","earliest");
        properties.setProperty("enable.auto.commit", "false");
        
        consumer = new KafkaConsumer(properties);
        int count = 0;
        try {
        	Duration interval = Duration.ofMinutes(2);
	        consumer.subscribe(Arrays.asList(topicName));
	        ObjectMapper mapper = new ObjectMapper();
	        while(true) {
	            // Poll for records
	            ConsumerRecords records = consumer.poll(interval);
	             for (TopicPartition partition : records.partitions()) {
	                 List> partitionRecords = records.records(partition);
	                 for (ConsumerRecord record : partitionRecords) {
		                    count += 1;
		                    JsonNode jsonNode = record.value();
		                    System.out.println(mapper.treeToValue(jsonNode,Contact.class));
		                    System.out.printf(count + ":offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
	                 }
	                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
	                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
	             }
	        }
        } catch (Exception ex) {
        	ex.printStackTrace();
        } finally {
        	  consumer.close();
        }
        return count;
    }
}

4. To run the codes, create a runner class:

package com.kafka.client;

import java.io.IOException;
import java.util.UUID;

// Handle starting producer or consumer
public class Run {
    public static void main(String[] args) throws IOException {
        if(args.length < 3) {
            usage();
        }
        // Get the brokers
        String brokers = args[2];
        String topicName = args[1];
        switch(args[0].toLowerCase()) {
            case "producer":
                Producer.produce(brokers, topicName);
                break;
            case "consumer":
                // Either a groupId was passed in, or we need a random one
                String groupId;
                if(args.length == 4) {
                    groupId = args[3];
                } else {
                    groupId = UUID.randomUUID().toString();
                }
                Consumer.consume(brokers, groupId, topicName);
                break;
            default:
                usage();
        }
        System.exit(0);
    }
    // Display usage
    public static void usage() {
        System.out.println("Usage:");
        System.out.println("kafka-0.0.1.jar <producer|consumer>  brokerhosts [groupid]");
        System.exit(1);
    }
}

5. Package the classes into jar via pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com</groupId>

<artifactId>kafka</artifactId>

<version>0.0.1</version>

<packaging>jar</packaging>

<name>kafka-client</name>

<properties>

<maven.compiler.target>1.8</maven.compiler.target>

<maven.compiler.source>1.8</maven.compiler.source>

</properties>

<dependencies>

<!-- Kafka client for producer/consumer operations -->

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.0.0</version>

</dependency>

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>connect-json</artifactId>

    <version>2.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-streams</artifactId>

<version>2.0.0</version>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<!-- Build an executable JAR -->

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-jar-plugin</artifactId>

<configuration>

<archive>

<manifest>

<mainClass>com.kafka.client.Run</mainClass>

</manifest>

</archive>

</configuration>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-dependency-plugin</artifactId>

<executions>

<execution>

<id>copy-dependencies</id>

<phase>prepare-package</phase>

<goals>

<goal>copy-dependencies</goal>

</goals>

<configuration>

<outputDirectory>${project.build.directory}/lib</outputDirectory>

<overWriteReleases>false</overWriteReleases>

<overWriteSnapshots>false</overWriteSnapshots>

<overWriteIfNewer>true</overWriteIfNewer>

</configuration>

</execution>

</executions>

</plugin>

</plugins>

</build>

</project>

6. Run the jar with dependencies library:

JAVAPATH=/usr/bin
CLS=/home/kafka-client/kafka-0.0.1.jar
CLLIB=/home/kafka-client/lib

$JAVAPATH/java -Xms128m -Xmx256m -classpath "$CLS:$CLLIB/*" com.kafka.client.Run consumer newtopic localhost:9092 0

JAVAPATH=/usr/bin
CLS=/home/kafka-client/kafka-0.0.1.jar
CLLIB=/home/kafka-client/lib

$JAVAPATH/java -Xms128m -Xmx256m -classpath "$CLS:$CLLIB/*" com.kafka.client.Run producer newtopic localhost:9092 0

Be the first to comment

Leave a Reply

Your email address will not be published.


*