Currently, there are 2 ways to write and read from kafka, via producer and consumer or kafka stream. Data are write once to kafka via producer and consumer, while with stream, data are streamed to kafka in bytes and read by bytes. This article describe the use of producer and consumer API for data storage, while kafka stream is for video, audio streaming purpose.
1. Create the object for Json Serialization and Deserilization
package com.kafka.client; import java.util.StringTokenizer; public class Contact { private int contactId; private String firstName; private String lastName; public Contact(){ } public Contact(int contactId, String firstName, String lastName) { this.contactId = contactId; this.firstName = firstName; this.lastName = lastName; } public void parseString(String csvStr){ StringTokenizer st = new StringTokenizer(csvStr,","); contactId = Integer.parseInt(st.nextToken()); firstName = st.nextToken(); lastName = st.nextToken(); } public int getContactId() { return contactId; } public void setContactId(int contactId) { this.contactId = contactId; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return "Contact{" + "contactId=" + contactId + ", firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + '}'; } }
2. Write into Kafka using Producer
package com.kafka.client; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.kafka.client.object.Contact; import java.util.Properties; import java.io.IOException; public class Producer { public static void produce(String brokers, String topicName) throws IOException { // Set properties used to configure the producer Properties properties = new Properties(); // Set the brokers (bootstrap servers) properties.setProperty("bootstrap.servers", brokers); // Set how to serialize key/value pairs properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer","org.apache.kafka.connect.json.JsonSerializer"); // specify the protocol for SSL Encryption This is needed for secure clusters //properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); KafkaProducer producer = new KafkaProducer(properties); ObjectMapper objectMapper = new ObjectMapper(); try { Contact contact = new Contact(); contact.setContactId(1); contact.setFirstName("Bububombo"); contact.setLastName("Tekateka"); JsonNode jsonNode = objectMapper.valueToTree(contact); ProducerRecord rec = new ProducerRecord(topicName, jsonNode); producer.send(rec); } catch (Exception ex) { ex.printStackTrace(); } finally { producer.close(); } } }
3. Read the data from Kafka Using Consumer with Manual Offset Commit
package com.kafka.client; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.kafka.client.object.Contact; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.Properties; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; public class Consumer { public static int consume(String brokers, String groupId, String topicName) { // Create a consumer KafkaConsumer consumer; // Configure the consumer Properties properties = new Properties(); // Point it to the brokers properties.setProperty("bootstrap.servers", brokers); // Set the consumer group (all consumers must belong to a group). properties.setProperty("group.id", groupId); // Set how to serialize key/value pairs properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer","org.apache.kafka.connect.json.JsonDeserializer"); // When a group is first created, it has no offset stored to start reading from. This tells it to start // with the earliest record in the stream. properties.setProperty("auto.offset.reset","earliest"); properties.setProperty("enable.auto.commit", "false"); consumer = new KafkaConsumer(properties); int count = 0; try { Duration interval = Duration.ofMinutes(2); consumer.subscribe(Arrays.asList(topicName)); ObjectMapper mapper = new ObjectMapper(); while(true) { // Poll for records ConsumerRecords records = consumer.poll(interval); for (TopicPartition partition : records.partitions()) { List> partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { count += 1; JsonNode jsonNode = record.value(); System.out.println(mapper.treeToValue(jsonNode,Contact.class)); System.out.printf(count + ":offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } catch (Exception ex) { ex.printStackTrace(); } finally { consumer.close(); } return count; } }
4. To run the codes, create a runner class:
package com.kafka.client; import java.io.IOException; import java.util.UUID; // Handle starting producer or consumer public class Run { public static void main(String[] args) throws IOException { if(args.length < 3) { usage(); } // Get the brokers String brokers = args[2]; String topicName = args[1]; switch(args[0].toLowerCase()) { case "producer": Producer.produce(brokers, topicName); break; case "consumer": // Either a groupId was passed in, or we need a random one String groupId; if(args.length == 4) { groupId = args[3]; } else { groupId = UUID.randomUUID().toString(); } Consumer.consume(brokers, groupId, topicName); break; default: usage(); } System.exit(0); } // Display usage public static void usage() { System.out.println("Usage:"); System.out.println("kafka-0.0.1.jar <producer|consumer> brokerhosts [groupid]"); System.exit(1); } }
5. Package the classes into jar via pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1</version>
<packaging>jar</packaging>
<name>kafka-client</name>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<!-- Kafka client for producer/consumer operations -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- Build an executable JAR -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.kafka.client.Run</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
6. Run the jar with dependencies library:
JAVAPATH=/usr/bin CLS=/home/kafka-client/kafka-0.0.1.jar CLLIB=/home/kafka-client/lib $JAVAPATH/java -Xms128m -Xmx256m -classpath "$CLS:$CLLIB/*" com.kafka.client.Run consumer newtopic localhost:9092 0 JAVAPATH=/usr/bin CLS=/home/kafka-client/kafka-0.0.1.jar CLLIB=/home/kafka-client/lib $JAVAPATH/java -Xms128m -Xmx256m -classpath "$CLS:$CLLIB/*" com.kafka.client.Run producer newtopic localhost:9092 0
Leave a Reply