Thursday, September 14, 2017

How to do SSH forwarding from one machine to another machine

https://stackoverflow.com/questions/44768552/want-to-use-the-same-host-and-port-for-ssh-session

Wednesday, August 24, 2016

How to describe a hbase table with Column family and column qualifiers


package m7example;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
/*
 * command to run the program
java -cp  m7example-0.0.1-jar-with-dependencies.jar:`hbase classpath` m7example.RetrieveData2 <tableName>
 */

public class RetrieveData2{

   public static void main(String[] args) throws IOException, Exception{
 

  String tableName=null;

if(args.length>0)
{
if(args[0] != null)
{
System.out.println("Table Name is "+args[0]);
tableName=args[0];
}
}
else
{
System.out.println("Please input table name (with full path) as argument");
System.exit(0);
}



      // Instantiating Configuration class
      Configuration config = HBaseConfiguration.create();

      // Instantiating HTable class
      //HTable table = new HTable(config, "/path/students");

   
      HTable table = new HTable(config,tableName);

      // Instantiating configuration class
      Configuration conf = HBaseConfiguration.create();

      // Instantiating HBaseAdmin class
      HBaseAdmin admin = new HBaseAdmin(conf);

      // Verifying the existance of the table
      boolean bool = admin.tableExists(tableName);
   
      if(!bool)
      {
      System.out.println(tableName+" does n't exist");
      System.exit(0);
      }
   

   
   
       Scan scan = new Scan();
      //Scan scan = new Scan(Bytes.toBytes("000"),Bytes.toBytes("001")); //use start-row/stop-row if you want to limit the full table scan
       ResultScanner scanner = table.getScanner(scan);

 

ArrayList<String> cf =new ArrayList<String>();

System.out.println("Please be patient, I am doing a full table scan");
System.out.print("=");

for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {



System.out.print(".");

//String rowKey= Bytes.toString(rr.getRow());

Iterator<KeyValue> iter = rr.list().iterator();
while (iter.hasNext()) {
KeyValue kv = iter.next();

if(!cf.contains(Bytes.toString(kv.getFamily())))
{
cf.add(Bytes.toString(kv.getFamily()));
System.out.print("=");
}





}//while


}

ArrayList<String> qf[] = new ArrayList[cf.size()]; //Put the length of the array you need

   for(int x = 0; x < qf.length; x++){
       qf[x] = new ArrayList<>();
   }
 


scan = new Scan();
            scanner = table.getScanner(scan);

int index=0;

System.out.println("Scanning qualifiers of Column family ");
System.out.print("=");

for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {

index=0;

for(String y:cf)
{
//System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
//System.out.println("Scanning qualifiers of Column family " + y);
//System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");

//index=cf.indexOf(y);

for (String p: getColumnsOfCF(rr,y))
     {

    if(!qf[index].contains(p))
    {
    qf[index].add(p);
    System.out.print("=");
    }

     }

index++;
}

}


System.out.println("ColumnFamily and ColumnQualifiers of "+tableName);
System.out.println("============================================================================");
index=0;

for(index=0;index<cf.size();index++)
{
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
System.out.println(cf.get(index));
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");


for(String x:qf[index])
{
System.out.println(cf.get(index)+":"+x);
}

}

   
      table.close();
   }
 
   public static String[]  getColumnsOfCF(Result r, String ColumnFamily)
   {

         NavigableMap<byte[], byte[]> familyMap = r.getFamilyMap(Bytes.toBytes(ColumnFamily));
         String[] qfs = new String[familyMap.size()];

         int counter = 0;
         for(byte[] bytqfs : familyMap.keySet())
         {
             qfs [counter++] = Bytes.toString(bytqfs );

         }

         return qfs ;
       

   }
 
 
}


Pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mapr.support</groupId>
  <artifactId>m7example</artifactId>
  <version>0.0.1</version>


  <repositories>
<repository>
<id>mapr-maven</id>
<url>http://repository.mapr.com/maven</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
  </repositories>

 
  <dependencies>


<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase</artifactId>
  <version>0.94.13-mapr-1401-m7-3.1.0</version>
  </dependency>

  </dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>5</source>
<target>5</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>m7example.M7Demo</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>


Shell script to call the java program

hbase-describe-All.sh
~~~~~~~~~~~~~~~~
#!/bin/bash

ARGS_EXPECTED=1

if [ $# -ne $ARGS_EXPECTED ]
then
  echo "[$HOSTNAME]: Usage: `basename $0` table_name(with full path)"
  exit 1
fi

java -cp  /path-to-jar/m7example-0.0.1-jar-with-dependencies.jar:`hbase classpath` m7example.RetrieveData2 $1

Wednesday, July 20, 2016

How to Create a Kafka simple Producer (Java)

package packagename;

import java.io.IOException;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

public static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);

TestProducer(long events, String topic, String fileID_gen, String a_broker) throws IOException {

//sfileID_gen="random";

Properties props = new Properties();
//props.put("metadata.broker.list", "localhost:9092");
//props.put("metadata.broker.list", "localhost:9092");
LOG.info("setting metadata.broker.list ="+a_broker);
props.put("metadata.broker.list", a_broker);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "packagename.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);




String msg="";
int index=0;

for (long nEvents = 0; nEvents < events; nEvents++) {


if (fileID_gen.equals("random"))
{
msg = "\""+randomChar()+"\""+ randomLorT()+"\",\"numRecords\":165848,\"hasErrors\":" + randomBoolean() + "}";
}

KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg.trim());

LOG.info("Generated data is " + msg);

LOG.info("waiting to generate file details");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
LOG.error("thread sleep error",e);
}

producer.send(data);

}// for-close


producer.close();
}// constructor-close

public boolean randomBoolean() {
return Math.random() < 0.5;
}

public String randomChar() {
Random r = new Random();
char random_char = (char) (r.nextInt(26) + 'a');
//Character.toString(random_char);
return Character.toString(random_char);
}

public String randomLorT()
{
final String alphabet = "T";
   final int N = alphabet.length();
   char random_char = 0;
   Random r = new Random();

   for (int i = 0; i <= N ; i++) {
       random_char=alphabet.charAt(r.nextInt(N));
   }
 
   return Character.toString(random_char);

}



}// TestProducer-class-close




SimplePartitioner Class



package packagename;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {

}

public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}

}

Dependency:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>

Apache Drill JDBC Example


package packagename;

import java.sql.*;

public class DrillJDBCExample1 {
    static final String JDBC_DRIVER = "org.apache.drill.jdbc.Driver";
    static final String DB_URL = "jdbc:drill:zk=machineName:5181/drill/x-drillbits";

//You can get this URL from drill explorer, if you have installed
//Zookeeper port is 5181 in MapR hadoop


    //static final String USER = "admin";
    //static final String PASS = "admin";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try{

        Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL," "," ");
            stmt = conn.createStatement();

            /* Perform a select on data in the classpath storage plugin. */
         
            String sql = "select transaction_week_end,SUM(SumOfSale) as sq,SUM(SumOfPrice) as sp from dfs.`StoragepluginName`.`drillTableName` GROUP BY `transaction_week_end`";
         
            ResultSet rs = stmt.executeQuery(sql);

            while(rs.next()) {
            
             System.out.print(rs.getBigDecimal("sq")+"\t");
            System.out.print(rs.getBigDecimal("sp")+"\n");
            
            }

            rs.close();
            stmt.close();
            conn.close();
        } catch(SQLException se) {
            //Handle errors for JDBC
            se.printStackTrace();
        } catch(Exception e) {
            //Handle errors for Class.forName
            e.printStackTrace();
        } finally {
            try{
                if(stmt!=null)
                    stmt.close();
            } catch(SQLException se2) {
            }
            try {
                if(conn!=null)
                    conn.close();
            } catch(SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

How to Package a Maven Project and ftp the jar files to another server, from Eclipse in single step (from Windows Machine)


In most of the scenarios, I have worked (in Hadoop Project or Spark Projects.)

I end up developing my code (mostly Jave) in a Windows machine, (with my dependencies jars added - Maven Project) and ftping the jars to Hadoop or Spark cluster.

I found an easy step to package and ftp in single step




Write a build.bat

call "c:\Program Files\Apache\Maven\bin\mvn" package

echo open sftp://UserName:Passwd@<ServerName-where-you-need-Jar> -hostkey="server's hostkey" >> ftpcmd.dat
echo cd /targer/path >> ftpcmd.dat
echo put "c:\workspace\ProjectName\target\project-0.0.1-SNAPSHOT.jar" >> ftpcmd.dat
echo put "c:\workspace\ProjectName\target\project-0.0.1-SNAPSHOT-jar-with-dependencies.jar" >> ftpcmd.dat
echo exit >> ftpcmd.dat
winscp.com /script=ftpcmd.dat
del ftpcmd.dat

Click Run -> External Tools (bottom most menu)-> External Tool Configuration

or






Every time you can click on this config from your Run menu, to package and ftp in single step

Tip: I ftp the fat Jar(with dependencies) only one time, and put a REM (REMARK - commenting a line in batch file) in the dependencies ftp line.

Also commet out the build plugin in pom.xml

Then I rename the class, (so It won't conflict with the class from the fat jar) and ftp only the jar (with out dependencies)

I pass both the jars to the program, so I can avoid complete package and ftping time. and quickly I can check the changes in cluster and develop faster :)

<!--
<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>mainClass Name</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>

<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

</plugins>
</build>

-->

Partition a parquet file using Apache Spark



SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;
try
{
sc = new JavaSparkContext(spconf);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

//HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

DataFrame df = sqlContext.read().parquet("/path/hive/warehouse/nextgen_rjr.db/source.parquet");

df.printSchema();

//df.write().partitionBy("col_A","col_B").parquet("/targetPath/source_partitioned.parquet");
df.write().partitionBy("col_A").parquet("/targetPath/source_partitioned_by_single_col.parquet");

}
finally
{
sc.close();
}

System.out.println("Partitioned Parquet File created");

Apache Spark: read from Hbase table and process the data and create Hive Table directly


package packagename;

import java.io.IOException;
import java.io.Serializable;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;

import scala.Tuple2;


public class SparkCreateSomeJoinedTableInHiveFromHbase {

public static class Item implements Serializable {

private static final long serialVersionUID = 3926135063167745103L;
private String rowKey;
private String iUUID;

public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getiUUID() {
return iUUID;
}
public void setiUUID(String iUUID) {
this.iUUID = iUUID;
}


  }

public static class Org implements Serializable {
private static final long serialVersionUID = -7380406345952297820L;
private String rowKeyOrg;
 private String oUUID;

   public String getRowKeyOrg() {
return rowKeyOrg;
}
public void setRowKeyOrg(String rowKeyOrg) {
this.rowKeyOrg = rowKeyOrg;
}
public String getoUUID() {
return oUUID;
}
public void setoUUID(String oUUID) {
this.oUUID = oUUID;
}


  }

public static void main(String[] args) throws IOException {
SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;
Configuration itemsConfig = null;
Configuration orgConfig = null;
try
{
sc = new JavaSparkContext(spconf);
Configuration config = null;
try {
      config = HBaseConfiguration.create();
      System.out.println("HBase is running!");
    }
catch (Exception ce){
       ce.printStackTrace();
}


config.set(TableInputFormat.INPUT_TABLE, "/sourcetablePath/tableName");
config.set(TableInputFormat.SCAN_COLUMN_FAMILY, "x"); // column family
config.set(TableInputFormat.SCAN_COLUMNS, "x:y"); // 3 column qualifiers
//config.set(TableInputFormat.SCAN_ROW_START, "M|");
//config.set(TableInputFormat.SCAN_ROW_STOP, "M|~"); //RowKeyPattern



JavaPairRDD<ImmutableBytesWritable, Result> data = sc.newAPIHadoopRDD(config, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);


PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String> pairFunc =
       new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String, String> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

String m =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("x"), Bytes.toBytes("y")));
String rowKey =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
     
     
       if (m != null && rowKey != null) {
           return new Tuple2<String, String>(rowKey,m);
       }
       else {
           return null;
       }
   }
};


JavaPairRDD<String, String> pairRdd = data.mapToPair(pairFunc);

// SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

sqlContext.setConf("hive.exec.dynamic.partition", "true");
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");



DataFrame df = sqlContext.read().json(pairRdd.values());

DataFrame newDF = df.select(df.col("A").as("apple"),
     ,
//explode JSON      org.apache.spark.sql.functions.explode(df.col("JSONString")).as("flatitem"));

DataFrame finalRequiredFormatDF= newDF.select(
 newDF.col("somecol"),
 newDF.col("someDate").cast(DataTypes.DateType).as("y_date"),
     (org.apache.spark.sql.functions.when(org.apache.spark.sql.functions.size(newDF.col("flatitem.errorCodes")).equalTo(0), "N").otherwise("Y")).as("someFlag"),
     org.apache.spark.sql.functions.concat_ws("|",
     newDF.col("a"),
     newDF.col("b"),
     newDF.col("c"),
     newDF.col("d"))
     .as("abcd")
   
     );
   

finalRequiredFormatDF.printSchema();


try {

itemsConfig = HBaseConfiguration.create();
   System.out.println("HBase is running! created configuration for xyz table");
    }
catch (Exception ce){
       ce.printStackTrace();
}


itemsConfig.set(TableInputFormat.INPUT_TABLE, "/xyztable");
itemsConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "m"); // column family
itemsConfig.set(TableInputFormat.SCAN_COLUMNS, "m:iUUID"); // column qualifiers


JavaPairRDD<ImmutableBytesWritable, Result> itemsData = sc.newAPIHadoopRDD(itemsConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item> itemsFunc =  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String,Item> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

Item item = new Item();

String iUUID =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("m"), Bytes.toBytes("iUUID")));
String rowKey =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
       
        item.setRowKey(rowKey);
        item.setiUUID(iUUID);
       
        return new Tuple2<String, Item>(rowKey,item);
     
   }
};


JavaPairRDD<String,Item> itemsPairRdd = itemsData.mapToPair(itemsFunc);
DataFrame itemsDf = sqlContext.createDataFrame(itemsPairRdd.values(), Item.class);

itemsDf.printSchema();

try {

orgConfig = HBaseConfiguration.create();
      System.out.println("HBase is running! created configuration for outlets table");
    }
catch (Exception ce){
       ce.printStackTrace();
}



orgConfig.set(TableInputFormat.INPUT_TABLE, "/someZtable");
orgConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "z"); // column family
orgConfig.set(TableInputFormat.SCAN_COLUMNS, "z:oUUID"); // column qualifiers

JavaPairRDD<ImmutableBytesWritable, Result> orgData = sc.newAPIHadoopRDD(orgConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org> orgFunc =  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String,Org> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

Org org = new Org();

String oUUID =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("z"), Bytes.toBytes("oUUID")));
String rowKeyOrg =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
       
        org.setRowKeyOrg(rowKeyOrg);
        org.setoUUID(oUUID);
       
        return new Tuple2<String, Org>(rowKeyOrg,org);
     
   }
};


JavaPairRDD<String,Org> orgPairRdd = orgData.mapToPair(orgFunc);

DataFrame orgDf = sqlContext.createDataFrame(orgPairRdd.values(), Org.class);

orgDf.printSchema();


DataFrame joinedDF=finalRequiredFormatDF.join(org.apache.spark.sql.functions.broadcast(itemsDf), finalRequiredFormatDF.col("item_id").equalTo(itemsDf.col("rowKey")),"LEFT").join(org.apache.spark.sql.functions.broadcast(orgDf),finalRequiredFormatDF.col("outlet_id").equalTo(orgDf.col("rowKeyOrg")));

//joinedDF.printSchema();

joinedDF.select(
)
.write().mode(SaveMode.Overwrite).saveAsTable("schemaName.targetHiveTableName");
//partition is not always supported by Hive

//.write().mode(SaveMode.Overwrite).partitionBy("transaction_date").saveAsTable("schemaName.targetHiveTableName");

//if you want to still partition write as a parquet file and create Hive External table (some of the partition functions are not supported by Hive), or create a view using Apache Drill on top of the parquet file

//.write().partitionBy("transaction_date").parquet("/mapr/airavata/dev/nextgen/rjr/transaction_line_items_full_partitioned_by_Date_equi_join.parquet");

}
finally
{
sc.close();
}

System.out.println("Hive Table created");
       
}




}





  hadoop_classpath=$(hadoop classpath)
  HBASE_CLASSPATH=$(hbase classpath)
  sudo -u userName /spark-1.5.2/bin/spark-submit   --name jobName     --class packagename.SparkCreateSomeJoinedTableInHiveFromHbase --master yarn   --deploy-mode client   --num-executors 8    --total-executor-cores 32    --executor-memory 2G   --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}"   --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}"    --conf "spark.executor.extraClassPath=${hadoop_classpath}"  --jars "/path/projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar"  /path/projectName-0.0.1-SNAPSHOT.jar