Spark Application with Kafka Consumer and Hbase

1. Maven Project pom.xml.
Import all required dependencies, and package all dependencies, resources into a single jar. This is the best option else you will have problems trying to include the dependencies into the spark class path:

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com</groupId>

<artifactId>spark</artifactId>

<version>0.0.1</version>

<packaging>jar</packaging>

<name>spark-app</name>

<url>http://maven.apache.org</url>

<properties>

<maven.compiler.target>1.8</maven.compiler.target>

<maven.compiler.source>1.8</maven.compiler.source>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

</properties>

<dependencies>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>3.8.1</version>

<scope>test</scope>

</dependency>

<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId>

<version>2.3.1</version> </dependency> -->

<!-- Kafka client for producer/consumer operations -->

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-client</artifactId>

<version>2.1.0</version>

<type>jar</type>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>connect-json</artifactId>

<version>2.0.0</version>

</dependency>

<dependency>

<groupId>org.json</groupId>

<artifactId>json</artifactId>

<version>20180130</version>

<type>jar</type>

</dependency>

<dependency>

<groupId>org.apache.logging.log4j</groupId>

<artifactId>log4j-api</artifactId>

<version>2.11.1</version>

</dependency>

<dependency>

<groupId>org.apache.logging.log4j</groupId>

<artifactId>log4j-core</artifactId>

<version>2.11.1</version>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-dependency-plugin</artifactId>

<executions>

<execution>

<id>copy-dependencies</id>

<phase>prepare-package</phase>

<goals>

<goal>copy-dependencies</goal>

</goals>

<configuration>

<outputDirectory>${project.build.directory}/lib</outputDirectory>

<overWriteReleases>false</overWriteReleases>

<overWriteSnapshots>false</overWriteSnapshots>

<overWriteIfNewer>true</overWriteIfNewer>

</configuration>

</execution>

</executions>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-assembly-plugin</artifactId>

<executions>

<execution>

<phase>package</phase>

<goals>

<goal>single</goal>

</goals>

<configuration>

<archive>

<manifest>

<mainClass>

com.spark.kafka.sparkRun

</mainClass>

</manifest>

</archive>

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

</configuration>

</execution>

</executions>

</plugin>

</plugins>

</build>

</project>

2. Create a runnable class to start the process:

package com.spark.kafka;

import java.util.UUID;

import com.spark.hbase.hbaseTable;
import com.spark.kafka.Consumer;
import com.spark.kafka.Producer;

public class sparkRun {

	public static void main(String[] args) {
		try {
			if (args.length < 3) {
				usage();
			}
			// Get the brokers
			String topicName = args[1];
			String tcolumn = args[2];
			String jsondata = args[3];
			String brokers = args[4];
			switch (args[0].toLowerCase()) {
			case "producer":
				// read from hbase, write to producer
				Producer.produce(brokers, topicName, jsondata);
				break;
			case "consumer":
				// read from consumer, write to hbase
				String groupId;
				if (args.length == 6) {
					groupId = args[5];
				} else {
					groupId = UUID.randomUUID().toString();
				}
				Consumer.consume(brokers, groupId, topicName);
				break;
			case "describe":
				hbaseTable.DescribeTable(topicName);
				break;
			case "create":
				hbaseTable.CreateTable(topicName, tcolumn);
				break;
			case "delete":
				hbaseTable.DeleteTable(topicName);
				break;
			default:
				usage();
			}
			System.exit(0);
		} catch (Exception ex) {
			ex.printStackTrace();
		}
	}

	// Display usage
	public static void usage() {
		System.out.println("Usage:");
		System.out.println(
				"spark-0.0.1.jar     brokerhosts [groupid]");
		System.exit(1);
	}

}

3. Create a consumer class to read from kafka:

package com.spark.kafka;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import com.fasterxml.jackson.databind.JsonNode;
import com.spark.hbase.hbaseRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Consumer {
	
	private static final Logger LOGGER = LogManager.getLogger(Consumer.class.getName());
	
	public static int consume(String brokers, String groupId, String topicName) {
		// Create a consumer
		KafkaConsumer consumer;
		// Configure the consumer
		Properties properties = new Properties();
		// Point it to the brokers
		properties.setProperty("bootstrap.servers", brokers);
		// Set the consumer group (all consumers must belong to a group).
		properties.setProperty("group.id", groupId);
		// Set how to serialize key/value pairs
		properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		properties.setProperty("value.deserializer", "org.apache.kafka.connect.json.JsonDeserializer");

		// When a group is first created, it has no offset stored to start reading from.
		// This tells it to start
		// with the earliest record in the stream.
		properties.setProperty("auto.offset.reset", "earliest");
		properties.setProperty("enable.auto.commit", "false");

		consumer = new KafkaConsumer(properties);
		int count = 0;
		try {
			Duration interval = Duration.ofMinutes(2);
			consumer.subscribe(Arrays.asList(topicName));
			hbaseRecord r = new hbaseRecord();
			LOGGER.debug("Kafka Consumer Started");
			while (true) {
				// Poll for records
				ConsumerRecords records = consumer.poll(interval);
				for (TopicPartition partition : records.partitions()) {
					List> partitionRecords = records.records(partition);
					LOGGER.debug("Kafka partition records:" + partitionRecords.size());
					for (ConsumerRecord record : partitionRecords) {
						LOGGER.debug("Kafka record:" + record.value());
						count += 1;
						JsonNode jsonNode = record.value();
						LOGGER.debug(count + ":offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
								record.value());
						r.saveupdateColumn(topicName, "mykey1", "contacts", jsonNode.toString());

					}
					long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
					consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
				}
			}
		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			consumer.close();
		}
		return count;
	}
}

4. Create hbase class to store the data into the table:

package com.spark.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class hbaseRecord {

	Configuration conf = HBaseConfiguration.create();
	String cfamily = "jsondata";
	private static final Logger LOGGER = LogManager.getLogger(hbaseRecord.class.getName());

	public hbaseRecord() {
		conf.set("hbase.zookeeper.quorum", "localhost");
		conf.set("zookeeper.znode.parent", "/hbase");
		conf.set("hbase.master", "localhost:16000");
		conf.set("hbase.zookeeper.property.clientPort", "2181");
	}

	public boolean saveupdateColumn(String tname, String keyid, String tcolumn, String Json) throws IOException {

		Connection connection = null;
		Table table = null;
		boolean success = false;

		try {
			byte[] family = Bytes.toBytes(cfamily); // name of column family
			connection = ConnectionFactory.createConnection(conf);
			TableName tableName = TableName.valueOf(tname);
			table = connection.getTable(tableName);

			List puts = new ArrayList<>();
			byte[] key = Bytes.toBytes(keyid);
			LOGGER.debug("Family:" + family + ":Col:" + tcolumn + ":Data:" + Json);
			Put p = new Put(key);
			p.addColumn(family, Bytes.toBytes(tcolumn), Bytes.toBytes(Json));
			puts.add(p);
			table.put(puts);
			puts.clear();
			LOGGER.debug("Processed");
			success = true;

		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			if (table != null) {
				table.close();
			}
			if (connection != null) {
				connection.close();
			}
			LOGGER.debug("Close connection completed");
		}
		return success;
	}

	public String readColumnByKey(String tname, String keyid, String tcolumn) throws IOException {

		Connection connection = null;
		Table table = null;
		String res = "";

		try {
			connection = ConnectionFactory.createConnection(conf);
			TableName tableName = TableName.valueOf(tname);
			table = connection.getTable(tableName);

			byte[] key = Bytes.toBytes(keyid);
			Get g = new Get(key);
			Result r = table.get(g);
			res = r.toString();
			LOGGER.debug("Processed");

		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			if (table != null) {
				table.close();
			}
			if (connection != null) {
				connection.close();
			}
		}
		return res;
	}

	public List readRecords(String tname, String keyid, String tcolumn) throws IOException {

		Connection connection = null;
		Table table = null;
		List results = new ArrayList();

		try {
			connection = ConnectionFactory.createConnection(conf);
			TableName tableName = TableName.valueOf(tname);
			table = connection.getTable(tableName);

			Scan scan = new Scan();
			ResultScanner scanner = table.getScanner(scan);
			for (Result sr : scanner) {
				results.add(sr.toString());
				LOGGER.debug("Scan: " + sr);
			}
			scanner.close();
			LOGGER.debug("Processed");

		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			if (table != null) {
				table.close();
			}
			if (connection != null) {
				connection.close();
			}
		}
		return results;
	}
}

5. Ensure the table is created in hbase using hbase shell before running the spark app.

6. Run the spark application:

spark-submit --class com.spark.kafka.sparkRun --master yarn --deploy-mode cluster spark-0.0.1-full.jar consumer newtopic "" "" localhost:9092 0

Be the first to comment

Leave a Reply

Your email address will not be published.


*