Showing posts with label Hbase. Show all posts
Showing posts with label Hbase. Show all posts

Wednesday, August 24, 2016

How to describe a hbase table with Column family and column qualifiers


package m7example;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
/*
 * command to run the program
java -cp  m7example-0.0.1-jar-with-dependencies.jar:`hbase classpath` m7example.RetrieveData2 <tableName>
 */

public class RetrieveData2{

   public static void main(String[] args) throws IOException, Exception{
 

  String tableName=null;

if(args.length>0)
{
if(args[0] != null)
{
System.out.println("Table Name is "+args[0]);
tableName=args[0];
}
}
else
{
System.out.println("Please input table name (with full path) as argument");
System.exit(0);
}



      // Instantiating Configuration class
      Configuration config = HBaseConfiguration.create();

      // Instantiating HTable class
      //HTable table = new HTable(config, "/path/students");

   
      HTable table = new HTable(config,tableName);

      // Instantiating configuration class
      Configuration conf = HBaseConfiguration.create();

      // Instantiating HBaseAdmin class
      HBaseAdmin admin = new HBaseAdmin(conf);

      // Verifying the existance of the table
      boolean bool = admin.tableExists(tableName);
   
      if(!bool)
      {
      System.out.println(tableName+" does n't exist");
      System.exit(0);
      }
   

   
   
       Scan scan = new Scan();
      //Scan scan = new Scan(Bytes.toBytes("000"),Bytes.toBytes("001")); //use start-row/stop-row if you want to limit the full table scan
       ResultScanner scanner = table.getScanner(scan);

 

ArrayList<String> cf =new ArrayList<String>();

System.out.println("Please be patient, I am doing a full table scan");
System.out.print("=");

for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {



System.out.print(".");

//String rowKey= Bytes.toString(rr.getRow());

Iterator<KeyValue> iter = rr.list().iterator();
while (iter.hasNext()) {
KeyValue kv = iter.next();

if(!cf.contains(Bytes.toString(kv.getFamily())))
{
cf.add(Bytes.toString(kv.getFamily()));
System.out.print("=");
}





}//while


}

ArrayList<String> qf[] = new ArrayList[cf.size()]; //Put the length of the array you need

   for(int x = 0; x < qf.length; x++){
       qf[x] = new ArrayList<>();
   }
 


scan = new Scan();
            scanner = table.getScanner(scan);

int index=0;

System.out.println("Scanning qualifiers of Column family ");
System.out.print("=");

for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {

index=0;

for(String y:cf)
{
//System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
//System.out.println("Scanning qualifiers of Column family " + y);
//System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");

//index=cf.indexOf(y);

for (String p: getColumnsOfCF(rr,y))
     {

    if(!qf[index].contains(p))
    {
    qf[index].add(p);
    System.out.print("=");
    }

     }

index++;
}

}


System.out.println("ColumnFamily and ColumnQualifiers of "+tableName);
System.out.println("============================================================================");
index=0;

for(index=0;index<cf.size();index++)
{
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
System.out.println(cf.get(index));
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");


for(String x:qf[index])
{
System.out.println(cf.get(index)+":"+x);
}

}

   
      table.close();
   }
 
   public static String[]  getColumnsOfCF(Result r, String ColumnFamily)
   {

         NavigableMap<byte[], byte[]> familyMap = r.getFamilyMap(Bytes.toBytes(ColumnFamily));
         String[] qfs = new String[familyMap.size()];

         int counter = 0;
         for(byte[] bytqfs : familyMap.keySet())
         {
             qfs [counter++] = Bytes.toString(bytqfs );

         }

         return qfs ;
       

   }
 
 
}


Pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mapr.support</groupId>
  <artifactId>m7example</artifactId>
  <version>0.0.1</version>


  <repositories>
<repository>
<id>mapr-maven</id>
<url>http://repository.mapr.com/maven</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
  </repositories>

 
  <dependencies>


<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase</artifactId>
  <version>0.94.13-mapr-1401-m7-3.1.0</version>
  </dependency>

  </dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>5</source>
<target>5</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>m7example.M7Demo</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>


Shell script to call the java program

hbase-describe-All.sh
~~~~~~~~~~~~~~~~
#!/bin/bash

ARGS_EXPECTED=1

if [ $# -ne $ARGS_EXPECTED ]
then
  echo "[$HOSTNAME]: Usage: `basename $0` table_name(with full path)"
  exit 1
fi

java -cp  /path-to-jar/m7example-0.0.1-jar-with-dependencies.jar:`hbase classpath` m7example.RetrieveData2 $1

Wednesday, July 20, 2016

Apache Spark: read from Hbase table and process the data and create Hive Table directly


package packagename;

import java.io.IOException;
import java.io.Serializable;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;

import scala.Tuple2;


public class SparkCreateSomeJoinedTableInHiveFromHbase {

public static class Item implements Serializable {

private static final long serialVersionUID = 3926135063167745103L;
private String rowKey;
private String iUUID;

public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getiUUID() {
return iUUID;
}
public void setiUUID(String iUUID) {
this.iUUID = iUUID;
}


  }

public static class Org implements Serializable {
private static final long serialVersionUID = -7380406345952297820L;
private String rowKeyOrg;
 private String oUUID;

   public String getRowKeyOrg() {
return rowKeyOrg;
}
public void setRowKeyOrg(String rowKeyOrg) {
this.rowKeyOrg = rowKeyOrg;
}
public String getoUUID() {
return oUUID;
}
public void setoUUID(String oUUID) {
this.oUUID = oUUID;
}


  }

public static void main(String[] args) throws IOException {
SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;
Configuration itemsConfig = null;
Configuration orgConfig = null;
try
{
sc = new JavaSparkContext(spconf);
Configuration config = null;
try {
      config = HBaseConfiguration.create();
      System.out.println("HBase is running!");
    }
catch (Exception ce){
       ce.printStackTrace();
}


config.set(TableInputFormat.INPUT_TABLE, "/sourcetablePath/tableName");
config.set(TableInputFormat.SCAN_COLUMN_FAMILY, "x"); // column family
config.set(TableInputFormat.SCAN_COLUMNS, "x:y"); // 3 column qualifiers
//config.set(TableInputFormat.SCAN_ROW_START, "M|");
//config.set(TableInputFormat.SCAN_ROW_STOP, "M|~"); //RowKeyPattern



JavaPairRDD<ImmutableBytesWritable, Result> data = sc.newAPIHadoopRDD(config, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);


PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String> pairFunc =
       new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String, String> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

String m =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("x"), Bytes.toBytes("y")));
String rowKey =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
     
     
       if (m != null && rowKey != null) {
           return new Tuple2<String, String>(rowKey,m);
       }
       else {
           return null;
       }
   }
};


JavaPairRDD<String, String> pairRdd = data.mapToPair(pairFunc);

// SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

sqlContext.setConf("hive.exec.dynamic.partition", "true");
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");



DataFrame df = sqlContext.read().json(pairRdd.values());

DataFrame newDF = df.select(df.col("A").as("apple"),
     ,
//explode JSON      org.apache.spark.sql.functions.explode(df.col("JSONString")).as("flatitem"));

DataFrame finalRequiredFormatDF= newDF.select(
 newDF.col("somecol"),
 newDF.col("someDate").cast(DataTypes.DateType).as("y_date"),
     (org.apache.spark.sql.functions.when(org.apache.spark.sql.functions.size(newDF.col("flatitem.errorCodes")).equalTo(0), "N").otherwise("Y")).as("someFlag"),
     org.apache.spark.sql.functions.concat_ws("|",
     newDF.col("a"),
     newDF.col("b"),
     newDF.col("c"),
     newDF.col("d"))
     .as("abcd")
   
     );
   

finalRequiredFormatDF.printSchema();


try {

itemsConfig = HBaseConfiguration.create();
   System.out.println("HBase is running! created configuration for xyz table");
    }
catch (Exception ce){
       ce.printStackTrace();
}


itemsConfig.set(TableInputFormat.INPUT_TABLE, "/xyztable");
itemsConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "m"); // column family
itemsConfig.set(TableInputFormat.SCAN_COLUMNS, "m:iUUID"); // column qualifiers


JavaPairRDD<ImmutableBytesWritable, Result> itemsData = sc.newAPIHadoopRDD(itemsConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item> itemsFunc =  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String,Item> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

Item item = new Item();

String iUUID =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("m"), Bytes.toBytes("iUUID")));
String rowKey =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
       
        item.setRowKey(rowKey);
        item.setiUUID(iUUID);
       
        return new Tuple2<String, Item>(rowKey,item);
     
   }
};


JavaPairRDD<String,Item> itemsPairRdd = itemsData.mapToPair(itemsFunc);
DataFrame itemsDf = sqlContext.createDataFrame(itemsPairRdd.values(), Item.class);

itemsDf.printSchema();

try {

orgConfig = HBaseConfiguration.create();
      System.out.println("HBase is running! created configuration for outlets table");
    }
catch (Exception ce){
       ce.printStackTrace();
}



orgConfig.set(TableInputFormat.INPUT_TABLE, "/someZtable");
orgConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "z"); // column family
orgConfig.set(TableInputFormat.SCAN_COLUMNS, "z:oUUID"); // column qualifiers

JavaPairRDD<ImmutableBytesWritable, Result> orgData = sc.newAPIHadoopRDD(orgConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org> orgFunc =  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org>() {
 
private static final long serialVersionUID = -9049583931936132447L;

public Tuple2<String,Org> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {

Org org = new Org();

String oUUID =  Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("z"), Bytes.toBytes("oUUID")));
String rowKeyOrg =  Bytes.toString(immutableBytesWritableResultTuple2._1.get());      
       
        org.setRowKeyOrg(rowKeyOrg);
        org.setoUUID(oUUID);
       
        return new Tuple2<String, Org>(rowKeyOrg,org);
     
   }
};


JavaPairRDD<String,Org> orgPairRdd = orgData.mapToPair(orgFunc);

DataFrame orgDf = sqlContext.createDataFrame(orgPairRdd.values(), Org.class);

orgDf.printSchema();


DataFrame joinedDF=finalRequiredFormatDF.join(org.apache.spark.sql.functions.broadcast(itemsDf), finalRequiredFormatDF.col("item_id").equalTo(itemsDf.col("rowKey")),"LEFT").join(org.apache.spark.sql.functions.broadcast(orgDf),finalRequiredFormatDF.col("outlet_id").equalTo(orgDf.col("rowKeyOrg")));

//joinedDF.printSchema();

joinedDF.select(
)
.write().mode(SaveMode.Overwrite).saveAsTable("schemaName.targetHiveTableName");
//partition is not always supported by Hive

//.write().mode(SaveMode.Overwrite).partitionBy("transaction_date").saveAsTable("schemaName.targetHiveTableName");

//if you want to still partition write as a parquet file and create Hive External table (some of the partition functions are not supported by Hive), or create a view using Apache Drill on top of the parquet file

//.write().partitionBy("transaction_date").parquet("/mapr/airavata/dev/nextgen/rjr/transaction_line_items_full_partitioned_by_Date_equi_join.parquet");

}
finally
{
sc.close();
}

System.out.println("Hive Table created");
       
}




}





  hadoop_classpath=$(hadoop classpath)
  HBASE_CLASSPATH=$(hbase classpath)
  sudo -u userName /spark-1.5.2/bin/spark-submit   --name jobName     --class packagename.SparkCreateSomeJoinedTableInHiveFromHbase --master yarn   --deploy-mode client   --num-executors 8    --total-executor-cores 32    --executor-memory 2G   --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}"   --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}"    --conf "spark.executor.extraClassPath=${hadoop_classpath}"  --jars "/path/projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar"  /path/projectName-0.0.1-SNAPSHOT.jar