1. Maven Project pom.xml.
Import all required dependencies, and package all dependencies, resources into a single jar. This is the best option else you will have problems trying to include the dependencies into the spark class path:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId>
<artifactId>spark</artifactId>
<version>0.0.1</version>
<packaging>jar</packaging>
<name>spark-app</name>
<url>http://maven.apache.org</url>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version> </dependency> -->
<!-- Kafka client for producer/consumer operations -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
com.spark.kafka.sparkRun
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2. Create a runnable class to start the process:
package com.spark.kafka; import java.util.UUID; import com.spark.hbase.hbaseTable; import com.spark.kafka.Consumer; import com.spark.kafka.Producer; public class sparkRun { public static void main(String[] args) { try { if (args.length < 3) { usage(); } // Get the brokers String topicName = args[1]; String tcolumn = args[2]; String jsondata = args[3]; String brokers = args[4]; switch (args[0].toLowerCase()) { case "producer": // read from hbase, write to producer Producer.produce(brokers, topicName, jsondata); break; case "consumer": // read from consumer, write to hbase String groupId; if (args.length == 6) { groupId = args[5]; } else { groupId = UUID.randomUUID().toString(); } Consumer.consume(brokers, groupId, topicName); break; case "describe": hbaseTable.DescribeTable(topicName); break; case "create": hbaseTable.CreateTable(topicName, tcolumn); break; case "delete": hbaseTable.DeleteTable(topicName); break; default: usage(); } System.exit(0); } catch (Exception ex) { ex.printStackTrace(); } } // Display usage public static void usage() { System.out.println("Usage:"); System.out.println( "spark-0.0.1.jarbrokerhosts [groupid]"); System.exit(1); } }
3. Create a consumer class to read from kafka:
package com.spark.kafka; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import com.fasterxml.jackson.databind.JsonNode; import com.spark.hbase.hbaseRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.Properties; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class Consumer { private static final Logger LOGGER = LogManager.getLogger(Consumer.class.getName()); public static int consume(String brokers, String groupId, String topicName) { // Create a consumer KafkaConsumerconsumer; // Configure the consumer Properties properties = new Properties(); // Point it to the brokers properties.setProperty("bootstrap.servers", brokers); // Set the consumer group (all consumers must belong to a group). properties.setProperty("group.id", groupId); // Set how to serialize key/value pairs properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.connect.json.JsonDeserializer"); // When a group is first created, it has no offset stored to start reading from. // This tells it to start // with the earliest record in the stream. properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("enable.auto.commit", "false"); consumer = new KafkaConsumer (properties); int count = 0; try { Duration interval = Duration.ofMinutes(2); consumer.subscribe(Arrays.asList(topicName)); hbaseRecord r = new hbaseRecord(); LOGGER.debug("Kafka Consumer Started"); while (true) { // Poll for records ConsumerRecords records = consumer.poll(interval); for (TopicPartition partition : records.partitions()) { List > partitionRecords = records.records(partition); LOGGER.debug("Kafka partition records:" + partitionRecords.size()); for (ConsumerRecord record : partitionRecords) { LOGGER.debug("Kafka record:" + record.value()); count += 1; JsonNode jsonNode = record.value(); LOGGER.debug(count + ":offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); r.saveupdateColumn(topicName, "mykey1", "contacts", jsonNode.toString()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } catch (Exception ex) { ex.printStackTrace(); } finally { consumer.close(); } return count; } }
4. Create hbase class to store the data into the table:
package com.spark.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class hbaseRecord { Configuration conf = HBaseConfiguration.create(); String cfamily = "jsondata"; private static final Logger LOGGER = LogManager.getLogger(hbaseRecord.class.getName()); public hbaseRecord() { conf.set("hbase.zookeeper.quorum", "localhost"); conf.set("zookeeper.znode.parent", "/hbase"); conf.set("hbase.master", "localhost:16000"); conf.set("hbase.zookeeper.property.clientPort", "2181"); } public boolean saveupdateColumn(String tname, String keyid, String tcolumn, String Json) throws IOException { Connection connection = null; Table table = null; boolean success = false; try { byte[] family = Bytes.toBytes(cfamily); // name of column family connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf(tname); table = connection.getTable(tableName); Listputs = new ArrayList<>(); byte[] key = Bytes.toBytes(keyid); LOGGER.debug("Family:" + family + ":Col:" + tcolumn + ":Data:" + Json); Put p = new Put(key); p.addColumn(family, Bytes.toBytes(tcolumn), Bytes.toBytes(Json)); puts.add(p); table.put(puts); puts.clear(); LOGGER.debug("Processed"); success = true; } catch (Exception ex) { ex.printStackTrace(); } finally { if (table != null) { table.close(); } if (connection != null) { connection.close(); } LOGGER.debug("Close connection completed"); } return success; } public String readColumnByKey(String tname, String keyid, String tcolumn) throws IOException { Connection connection = null; Table table = null; String res = ""; try { connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf(tname); table = connection.getTable(tableName); byte[] key = Bytes.toBytes(keyid); Get g = new Get(key); Result r = table.get(g); res = r.toString(); LOGGER.debug("Processed"); } catch (Exception ex) { ex.printStackTrace(); } finally { if (table != null) { table.close(); } if (connection != null) { connection.close(); } } return res; } public List readRecords(String tname, String keyid, String tcolumn) throws IOException { Connection connection = null; Table table = null; List results = new ArrayList (); try { connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf(tname); table = connection.getTable(tableName); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); for (Result sr : scanner) { results.add(sr.toString()); LOGGER.debug("Scan: " + sr); } scanner.close(); LOGGER.debug("Processed"); } catch (Exception ex) { ex.printStackTrace(); } finally { if (table != null) { table.close(); } if (connection != null) { connection.close(); } } return results; } }
5. Ensure the table is created in hbase using hbase shell before running the spark app.
6. Run the spark application:
spark-submit --class com.spark.kafka.sparkRun --master yarn --deploy-mode cluster spark-0.0.1-full.jar consumer newtopic "" "" localhost:9092 0
Leave a Reply