Wednesday, July 20, 2016

How to Create a Kafka simple Producer (Java)

package packagename;

import java.io.IOException;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

public static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);

TestProducer(long events, String topic, String fileID_gen, String a_broker) throws IOException {

//sfileID_gen="random";

Properties props = new Properties();
//props.put("metadata.broker.list", "localhost:9092");
//props.put("metadata.broker.list", "localhost:9092");
LOG.info("setting metadata.broker.list ="+a_broker);
props.put("metadata.broker.list", a_broker);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "packagename.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);




String msg="";
int index=0;

for (long nEvents = 0; nEvents < events; nEvents++) {


if (fileID_gen.equals("random"))
{
msg = "\""+randomChar()+"\""+ randomLorT()+"\",\"numRecords\":165848,\"hasErrors\":" + randomBoolean() + "}";
}

KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg.trim());

LOG.info("Generated data is " + msg);

LOG.info("waiting to generate file details");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
LOG.error("thread sleep error",e);
}

producer.send(data);

}// for-close


producer.close();
}// constructor-close

public boolean randomBoolean() {
return Math.random() < 0.5;
}

public String randomChar() {
Random r = new Random();
char random_char = (char) (r.nextInt(26) + 'a');
//Character.toString(random_char);
return Character.toString(random_char);
}

public String randomLorT()
{
final String alphabet = "T";
   final int N = alphabet.length();
   char random_char = 0;
   Random r = new Random();

   for (int i = 0; i <= N ; i++) {
       random_char=alphabet.charAt(r.nextInt(N));
   }
 
   return Character.toString(random_char);

}



}// TestProducer-class-close




SimplePartitioner Class



package packagename;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {

}

public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}

}

Dependency:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>

Apache Drill JDBC Example


package packagename;

import java.sql.*;

public class DrillJDBCExample1 {
    static final String JDBC_DRIVER = "org.apache.drill.jdbc.Driver";
    static final String DB_URL = "jdbc:drill:zk=machineName:5181/drill/x-drillbits";

//You can get this URL from drill explorer, if you have installed
//Zookeeper port is 5181 in MapR hadoop


    //static final String USER = "admin";
    //static final String PASS = "admin";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try{

        Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL," "," ");
            stmt = conn.createStatement();

            /* Perform a select on data in the classpath storage plugin. */
         
            String sql = "select transaction_week_end,SUM(SumOfSale) as sq,SUM(SumOfPrice) as sp from dfs.`StoragepluginName`.`drillTableName` GROUP BY `transaction_week_end`";
         
            ResultSet rs = stmt.executeQuery(sql);

            while(rs.next()) {
            
             System.out.print(rs.getBigDecimal("sq")+"\t");
            System.out.print(rs.getBigDecimal("sp")+"\n");
            
            }

            rs.close();
            stmt.close();
            conn.close();
        } catch(SQLException se) {
            //Handle errors for JDBC
            se.printStackTrace();
        } catch(Exception e) {
            //Handle errors for Class.forName
            e.printStackTrace();
        } finally {
            try{
                if(stmt!=null)
                    stmt.close();
            } catch(SQLException se2) {
            }
            try {
                if(conn!=null)
                    conn.close();
            } catch(SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

How to Package a Maven Project and ftp the jar files to another server, from Eclipse in single step (from Windows Machine)


In most of the scenarios, I have worked (in Hadoop Project or Spark Projects.)

I end up developing my code (mostly Jave) in a Windows machine, (with my dependencies jars added - Maven Project) and ftping the jars to Hadoop or Spark cluster.

I found an easy step to package and ftp in single step




Write a build.bat

call "c:\Program Files\Apache\Maven\bin\mvn" package

echo open sftp://UserName:Passwd@<ServerName-where-you-need-Jar> -hostkey="server's hostkey" >> ftpcmd.dat
echo cd /targer/path >> ftpcmd.dat
echo put "c:\workspace\ProjectName\target\project-0.0.1-SNAPSHOT.jar" >> ftpcmd.dat
echo put "c:\workspace\ProjectName\target\project-0.0.1-SNAPSHOT-jar-with-dependencies.jar" >> ftpcmd.dat
echo exit >> ftpcmd.dat
winscp.com /script=ftpcmd.dat
del ftpcmd.dat

Click Run -> External Tools (bottom most menu)-> External Tool Configuration

or






Every time you can click on this config from your Run menu, to package and ftp in single step

Tip: I ftp the fat Jar(with dependencies) only one time, and put a REM (REMARK - commenting a line in batch file) in the dependencies ftp line.

Also commet out the build plugin in pom.xml

Then I rename the class, (so It won't conflict with the class from the fat jar) and ftp only the jar (with out dependencies)

I pass both the jars to the program, so I can avoid complete package and ftping time. and quickly I can check the changes in cluster and develop faster :)

<!--
<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>mainClass Name</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>

<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

</plugins>
</build>

-->

Partition a parquet file using Apache Spark



SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;
try
{
sc = new JavaSparkContext(spconf);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

//HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

DataFrame df = sqlContext.read().parquet("/path/hive/warehouse/nextgen_rjr.db/source.parquet");

df.printSchema();

//df.write().partitionBy("col_A","col_B").parquet("/targetPath/source_partitioned.parquet");
df.write().partitionBy("col_A").parquet("/targetPath/source_partitioned_by_single_col.parquet");

}
finally
{
sc.close();
}

System.out.println("Partitioned Parquet File created");

Apache Spark: read from Hbase table and process the data and create Hive Table directly


package packagename;

import java.io.IOException;
import java.io.Serializable;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;

import scala.Tuple2;


public class SparkCreateSomeJoinedTableInHiveFromHbase {

public static class Item implements Serializable {

private static final long serialVersionUID = 3926135063167745103L;
private String rowKey;
private String iUUID;

public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getiUUID() {
return iUUID;
}
public void setiUUID(String iUUID) {
this.iUUID = iUUID;
}


  }

public static class Org implements Serializable {
private static final long serialVersionUID = -7380406345952297820L;
private String rowKeyOrg;
 private String oUUID;

   public String getRowKeyOrg() {
return rowKeyOrg;
}
public void setRowKeyOrg(String rowKeyOrg) {
this.rowKeyOrg = rowKeyOrg;
}
public String getoUUID() {
return oUUID;
}
public void setoUUID(String oUUID) {
this.oUUID = oUUID;
}


  }

public static void main(String[] args) throws IOException {
SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;
Configuration itemsConfig = null;
Configuration orgConfig = null;
try
{
sc = new JavaSparkContext(spconf);
Configuration config = null;
try {
      config = HBaseConfiguration.create();
      System.out.println("HBase is running!");
    }
catch (Exception ce){
       ce.printStackTrace();
}


config.set(TableInputFormat.INPUT_TABLE, "/sourcetablePath/tableName");
config.set(TableInputFormat.SCAN_COLUMN_FAMILY, "x"); // column family
config.set(TableInputFormat.SCAN_COLUMNS, "x:y"); // 3 column qualifiers
//config.set(TableInputFormat.SCAN_ROW_START, "M|");
//config.set(TableInputFormat.SCAN_ROW_STOP, "M|~"); //RowKeyPattern



JavaPairRDD<ImmutableBytesWritable, Result> data = sc.newAPIHadoopRDD(config, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);


PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String> pairFunc =
       new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String, String> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

String m =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("x"), Bytes.toBytes("y")));
String rowKey =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
     
     
       if (m != null && rowKey != null) {
           return new Tuple2<String, String>(rowKey,m);
       }
       else {
           return null;
       }
   }
};


JavaPairRDD<String, String> pairRdd = data.mapToPair(pairFunc);

// SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

sqlContext.setConf("hive.exec.dynamic.partition", "true");
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");



DataFrame df = sqlContext.read().json(pairRdd.values());

DataFrame newDF = df.select(df.col("A").as("apple"),
     ,
//explode JSON      org.apache.spark.sql.functions.explode(df.col("JSONString")).as("flatitem"));

DataFrame finalRequiredFormatDF= newDF.select(
 newDF.col("somecol"),
 newDF.col("someDate").cast(DataTypes.DateType).as("y_date"),
     (org.apache.spark.sql.functions.when(org.apache.spark.sql.functions.size(newDF.col("flatitem.errorCodes")).equalTo(0), "N").otherwise("Y")).as("someFlag"),
     org.apache.spark.sql.functions.concat_ws("|",
     newDF.col("a"),
     newDF.col("b"),
     newDF.col("c"),
     newDF.col("d"))
     .as("abcd")
   
     );
   

finalRequiredFormatDF.printSchema();


try {

itemsConfig = HBaseConfiguration.create();
   System.out.println("HBase is running! created configuration for xyz table");
    }
catch (Exception ce){
       ce.printStackTrace();
}


itemsConfig.set(TableInputFormat.INPUT_TABLE, "/xyztable");
itemsConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "m"); // column family
itemsConfig.set(TableInputFormat.SCAN_COLUMNS, "m:iUUID"); // column qualifiers


JavaPairRDD<ImmutableBytesWritable, Result> itemsData = sc.newAPIHadoopRDD(itemsConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item> itemsFunc =  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String,Item> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

Item item = new Item();

String iUUID =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("m"), Bytes.toBytes("iUUID")));
String rowKey =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
       
        item.setRowKey(rowKey);
        item.setiUUID(iUUID);
       
        return new Tuple2<String, Item>(rowKey,item);
     
   }
};


JavaPairRDD<String,Item> itemsPairRdd = itemsData.mapToPair(itemsFunc);
DataFrame itemsDf = sqlContext.createDataFrame(itemsPairRdd.values(), Item.class);

itemsDf.printSchema();

try {

orgConfig = HBaseConfiguration.create();
      System.out.println("HBase is running! created configuration for outlets table");
    }
catch (Exception ce){
       ce.printStackTrace();
}



orgConfig.set(TableInputFormat.INPUT_TABLE, "/someZtable");
orgConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "z"); // column family
orgConfig.set(TableInputFormat.SCAN_COLUMNS, "z:oUUID"); // column qualifiers

JavaPairRDD<ImmutableBytesWritable, Result> orgData = sc.newAPIHadoopRDD(orgConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org> orgFunc =  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String,Org> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

Org org = new Org();

String oUUID =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("z"), Bytes.toBytes("oUUID")));
String rowKeyOrg =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
       
        org.setRowKeyOrg(rowKeyOrg);
        org.setoUUID(oUUID);
       
        return new Tuple2<String, Org>(rowKeyOrg,org);
     
   }
};


JavaPairRDD<String,Org> orgPairRdd = orgData.mapToPair(orgFunc);

DataFrame orgDf = sqlContext.createDataFrame(orgPairRdd.values(), Org.class);

orgDf.printSchema();


DataFrame joinedDF=finalRequiredFormatDF.join(org.apache.spark.sql.functions.broadcast(itemsDf), finalRequiredFormatDF.col("item_id").equalTo(itemsDf.col("rowKey")),"LEFT").join(org.apache.spark.sql.functions.broadcast(orgDf),finalRequiredFormatDF.col("outlet_id").equalTo(orgDf.col("rowKeyOrg")));

//joinedDF.printSchema();

joinedDF.select(
)
.write().mode(SaveMode.Overwrite).saveAsTable("schemaName.targetHiveTableName");
//partition is not always supported by Hive

//.write().mode(SaveMode.Overwrite).partitionBy("transaction_date").saveAsTable("schemaName.targetHiveTableName");

//if you want to still partition write as a parquet file and create Hive External table (some of the partition functions are not supported by Hive), or create a view using Apache Drill on top of the parquet file

//.write().partitionBy("transaction_date").parquet("/mapr/airavata/dev/nextgen/rjr/transaction_line_items_full_partitioned_by_Date_equi_join.parquet");

}
finally
{
sc.close();
}

System.out.println("Hive Table created");
       
}




}





  hadoop_classpath=$(hadoop classpath)
  HBASE_CLASSPATH=$(hbase classpath)
  sudo -u userName /spark-1.5.2/bin/spark-submit   --name jobName     --class packagename.SparkCreateSomeJoinedTableInHiveFromHbase --master yarn   --deploy-mode client   --num-executors 8    --total-executor-cores 32    --executor-memory 2G   --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}"   --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}"    --conf "spark.executor.extraClassPath=${hadoop_classpath}"  --jars "/path/projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar"  /path/projectName-0.0.1-SNAPSHOT.jar


 



How to read a Parquet file and make a dataframe and create Hive temp table


package packagename;



import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.hive.thriftserver.*;



public class SparkReadParquetAndRegTempTable {


public static void main(String[] args) throws ClassNotFoundException {


SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;
try
{
sc = new JavaSparkContext(spconf);

//SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());


DataFrame df = sqlContext.read().parquet("/path/parquetFolderName.parquet");


df.printSchema();


//To Query the table via beeline as Spark Hive table

df.registerTempTable("tempTable_spark");

HiveThriftServer2.startWithContext(sqlContext);

}
catch(Exception e)
{
System.out.print("Error is"+e.toString());
}



}

 


}


How to submit the job

hadoop_classpath=$(hadoop classpath)
HBASE_CLASSPATH=$(hbase classpath)

sudo -u userName /spark/spark-1.5.2/bin/spark-submit   --name tempSparkTable     --class packageName.SparkReadParquetAndRegTempTable  --master local[4]   --num-executors 8    --executor-cores 8    --executor-memory 8G   --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}"   --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}"    --conf "spark.executor.extraClassPath=${hadoop_classpath}"
 --jars /path/projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/path/projectName-0.0.1-SNAPSHOT.jar

Make sure, there is no Hive Thrift server running in port 10000 in the machine, where you run this program


Connect via /opt/mapr/hive/hive-1.2/bin/beeline -u jdbc:hive2://serverName:10000 -n UserName

Show tables;

should list your table name in the list

tempTable_spark

Then you can run the queries against this temp table


Maven Dependencies:

<dependencies>

  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

 <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-common</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-examples</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>9.4.1208</version>
</dependency>

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.11</artifactId>
    <version>1.2.0</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive-thriftserver_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

</dependencies>





How to create Spark Dataframe from (Read) PostgreSql and write processed data frame to PostgreSql/MySql

package com.packagename;


import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class SparkReadFromPostgres {

public static void main(String[] args) {

Map<String, String> options = new HashMap<String, String>();

options.put("url", "jdbc:postgresql://servername:5432/dbname");
options.put("user", "username");
options.put("password", "<somePassword>");
options.put("driverClassName", "org.postgresql.Driver");
options.put("dbtable", "schema.tableName");

SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;

try
{
sc = new JavaSparkContext(spconf);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame jdbcDF = sqlContext.read().format("jdbc").options(options).load();

jdbcDF.printSchema();

jdbcDF.show(10);


//if you want to create the table again in Postgres

createPostGresTable(jdbcDF,"testTable");

}
finally
{
sc.close();
}

}

public static void createPostGresTable(DataFrame output, String postgresTableName)
{
String url = "jdbc:postgresql://serverName:5432/dbName";
Properties props = new Properties();
props.setProperty("user","userName");
props.setProperty("password","password");
//props.setProperty("ssl","true");
props.setProperty("driverClassName","org.postgresql.Driver");

//Connection conn = DriverManager.getConnection(url, props);

//String postgresTable="TableName";

output.write().mode("overwrite").jdbc(url, postgresTableName, props);

}


}

Command to Run the Spark Program

hadoop_classpath=$(hadoop classpath)
HBASE_CLASSPATH=$(hbase classpath)


sudo -u userId  /spark/spark-1.5.2/bin/spark-submit   --name SparkReadFromPostgres     --class com.packagename.SparkReadFromPostgres   --master yarn   --deploy-mode client   --num-executors 8    --executor-cores 8    --executor-memory 4G   --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}"   --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}"    --conf "spark.executor.extraClassPath=${hadoop_classpath}"  --conf "spark.executor.extraClassPath=/sharedpath/postgresql-9.4.1208.jar"   --conf "spark.driver.extraClassPath=/sharedpath/postgresql-9.4.1208.jar"  --jars /projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar  /projectName-0.0.1-SNAPSHOT.jar

pom.xml dependencies:

<dependencies>

  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

 <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-common</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-examples</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>9.4.1208</version>
</dependency>

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.11</artifactId>
    <version>1.2.0</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive-thriftserver_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

</dependencies>