Wednesday, July 20, 2016

Apache Spark: read from Hbase table and process the data and create Hive Table directly


package packagename;

import java.io.IOException;
import java.io.Serializable;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;

import scala.Tuple2;


public class SparkCreateSomeJoinedTableInHiveFromHbase {

public static class Item implements Serializable {

private static final long serialVersionUID = 3926135063167745103L;
private String rowKey;
private String iUUID;

public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getiUUID() {
return iUUID;
}
public void setiUUID(String iUUID) {
this.iUUID = iUUID;
}


  }

public static class Org implements Serializable {
private static final long serialVersionUID = -7380406345952297820L;
private String rowKeyOrg;
 private String oUUID;

   public String getRowKeyOrg() {
return rowKeyOrg;
}
public void setRowKeyOrg(String rowKeyOrg) {
this.rowKeyOrg = rowKeyOrg;
}
public String getoUUID() {
return oUUID;
}
public void setoUUID(String oUUID) {
this.oUUID = oUUID;
}


  }

public static void main(String[] args) throws IOException {
SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;
Configuration itemsConfig = null;
Configuration orgConfig = null;
try
{
sc = new JavaSparkContext(spconf);
Configuration config = null;
try {
      config = HBaseConfiguration.create();
      System.out.println("HBase is running!");
    }
catch (Exception ce){
       ce.printStackTrace();
}


config.set(TableInputFormat.INPUT_TABLE, "/sourcetablePath/tableName");
config.set(TableInputFormat.SCAN_COLUMN_FAMILY, "x"); // column family
config.set(TableInputFormat.SCAN_COLUMNS, "x:y"); // 3 column qualifiers
//config.set(TableInputFormat.SCAN_ROW_START, "M|");
//config.set(TableInputFormat.SCAN_ROW_STOP, "M|~"); //RowKeyPattern



JavaPairRDD<ImmutableBytesWritable, Result> data = sc.newAPIHadoopRDD(config, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);


PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String> pairFunc =
       new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String, String> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

String m =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("x"), Bytes.toBytes("y")));
String rowKey =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
     
     
       if (m != null && rowKey != null) {
           return new Tuple2<String, String>(rowKey,m);
       }
       else {
           return null;
       }
   }
};


JavaPairRDD<String, String> pairRdd = data.mapToPair(pairFunc);

// SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

sqlContext.setConf("hive.exec.dynamic.partition", "true");
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");



DataFrame df = sqlContext.read().json(pairRdd.values());

DataFrame newDF = df.select(df.col("A").as("apple"),
     ,
//explode JSON      org.apache.spark.sql.functions.explode(df.col("JSONString")).as("flatitem"));

DataFrame finalRequiredFormatDF= newDF.select(
 newDF.col("somecol"),
 newDF.col("someDate").cast(DataTypes.DateType).as("y_date"),
     (org.apache.spark.sql.functions.when(org.apache.spark.sql.functions.size(newDF.col("flatitem.errorCodes")).equalTo(0), "N").otherwise("Y")).as("someFlag"),
     org.apache.spark.sql.functions.concat_ws("|",
     newDF.col("a"),
     newDF.col("b"),
     newDF.col("c"),
     newDF.col("d"))
     .as("abcd")
   
     );
   

finalRequiredFormatDF.printSchema();


try {

itemsConfig = HBaseConfiguration.create();
   System.out.println("HBase is running! created configuration for xyz table");
    }
catch (Exception ce){
       ce.printStackTrace();
}


itemsConfig.set(TableInputFormat.INPUT_TABLE, "/xyztable");
itemsConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "m"); // column family
itemsConfig.set(TableInputFormat.SCAN_COLUMNS, "m:iUUID"); // column qualifiers


JavaPairRDD<ImmutableBytesWritable, Result> itemsData = sc.newAPIHadoopRDD(itemsConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item> itemsFunc =  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String,Item> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

Item item = new Item();

String iUUID =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("m"), Bytes.toBytes("iUUID")));
String rowKey =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
       
        item.setRowKey(rowKey);
        item.setiUUID(iUUID);
       
        return new Tuple2<String, Item>(rowKey,item);
     
   }
};


JavaPairRDD<String,Item> itemsPairRdd = itemsData.mapToPair(itemsFunc);
DataFrame itemsDf = sqlContext.createDataFrame(itemsPairRdd.values(), Item.class);

itemsDf.printSchema();

try {

orgConfig = HBaseConfiguration.create();
      System.out.println("HBase is running! created configuration for outlets table");
    }
catch (Exception ce){
       ce.printStackTrace();
}



orgConfig.set(TableInputFormat.INPUT_TABLE, "/someZtable");
orgConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "z"); // column family
orgConfig.set(TableInputFormat.SCAN_COLUMNS, "z:oUUID"); // column qualifiers

JavaPairRDD<ImmutableBytesWritable, Result> orgData = sc.newAPIHadoopRDD(orgConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org> orgFunc =  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String,Org> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

Org org = new Org();

String oUUID =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("z"), Bytes.toBytes("oUUID")));
String rowKeyOrg =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
       
        org.setRowKeyOrg(rowKeyOrg);
        org.setoUUID(oUUID);
       
        return new Tuple2<String, Org>(rowKeyOrg,org);
     
   }
};


JavaPairRDD<String,Org> orgPairRdd = orgData.mapToPair(orgFunc);

DataFrame orgDf = sqlContext.createDataFrame(orgPairRdd.values(), Org.class);

orgDf.printSchema();


DataFrame joinedDF=finalRequiredFormatDF.join(org.apache.spark.sql.functions.broadcast(itemsDf), finalRequiredFormatDF.col("item_id").equalTo(itemsDf.col("rowKey")),"LEFT").join(org.apache.spark.sql.functions.broadcast(orgDf),finalRequiredFormatDF.col("outlet_id").equalTo(orgDf.col("rowKeyOrg")));

//joinedDF.printSchema();

joinedDF.select(
)
.write().mode(SaveMode.Overwrite).saveAsTable("schemaName.targetHiveTableName");
//partition is not always supported by Hive

//.write().mode(SaveMode.Overwrite).partitionBy("transaction_date").saveAsTable("schemaName.targetHiveTableName");

//if you want to still partition write as a parquet file and create Hive External table (some of the partition functions are not supported by Hive), or create a view using Apache Drill on top of the parquet file

//.write().partitionBy("transaction_date").parquet("/mapr/airavata/dev/nextgen/rjr/transaction_line_items_full_partitioned_by_Date_equi_join.parquet");

}
finally
{
sc.close();
}

System.out.println("Hive Table created");
       
}




}





  hadoop_classpath=$(hadoop classpath)
  HBASE_CLASSPATH=$(hbase classpath)
  sudo -u userName /spark-1.5.2/bin/spark-submit   --name jobName     --class packagename.SparkCreateSomeJoinedTableInHiveFromHbase --master yarn   --deploy-mode client   --num-executors 8    --total-executor-cores 32    --executor-memory 2G   --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}"   --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}"    --conf "spark.executor.extraClassPath=${hadoop_classpath}"  --jars "/path/projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar"  /path/projectName-0.0.1-SNAPSHOT.jar


 



No comments: