package packagename;
import java.io.IOException;
import java.io.Serializable;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import scala.Tuple2;
public class SparkCreateSomeJoinedTableInHiveFromHbase {
public static class Item implements Serializable {
private static final long serialVersionUID = 3926135063167745103L;
private String rowKey;
private String iUUID;
public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getiUUID() {
return iUUID;
}
public void setiUUID(String iUUID) {
this.iUUID = iUUID;
}
}
public static class Org implements Serializable {
private static final long serialVersionUID = -7380406345952297820L;
private String rowKeyOrg;
private String oUUID;
public String getRowKeyOrg() {
return rowKeyOrg;
}
public void setRowKeyOrg(String rowKeyOrg) {
this.rowKeyOrg = rowKeyOrg;
}
public String getoUUID() {
return oUUID;
}
public void setoUUID(String oUUID) {
this.oUUID = oUUID;
}
}
public static void main(String[] args) throws IOException {
SparkConf spconf = new SparkConf();
spconf.set("spark.driver.maxResultSize", "3g");
JavaSparkContext sc=null;
Configuration itemsConfig = null;
Configuration orgConfig = null;
try
{
sc = new JavaSparkContext(spconf);
Configuration config = null;
try {
config = HBaseConfiguration.create();
System.out.println("HBase is running!");
}
catch (Exception ce){
ce.printStackTrace();
}
config.set(TableInputFormat.INPUT_TABLE, "/sourcetablePath/tableName");
config.set(TableInputFormat.SCAN_COLUMN_FAMILY, "x"); // column family
config.set(TableInputFormat.SCAN_COLUMNS, "x:y"); // 3 column qualifiers
//config.set(TableInputFormat.SCAN_ROW_START, "M|");
//config.set(TableInputFormat.SCAN_ROW_STOP, "M|~"); //RowKeyPattern
JavaPairRDD<ImmutableBytesWritable, Result> data = sc.newAPIHadoopRDD(config, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String> pairFunc =
new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String>() {
private static final long serialVersionUID = -9049583931936132447L;
public Tuple2<String, String> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
String m = Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("x"), Bytes.toBytes("y")));
String rowKey = Bytes.toString(immutableBytesWritableResultTuple2._1.get());
if (m != null && rowKey != null) {
return new Tuple2<String, String>(rowKey,m);
}
else {
return null;
}
}
};
JavaPairRDD<String, String> pairRdd = data.mapToPair(pairFunc);
// SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
sqlContext.setConf("hive.exec.dynamic.partition", "true");
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");
DataFrame df = sqlContext.read().json(pairRdd.values());
DataFrame newDF = df.select(df.col("A").as("apple"),
,
//explode JSON org.apache.spark.sql.functions.explode(df.col("JSONString")).as("flatitem"));
DataFrame finalRequiredFormatDF= newDF.select(
newDF.col("somecol"),
newDF.col("someDate").cast(DataTypes.DateType).as("y_date"),
(org.apache.spark.sql.functions.when(org.apache.spark.sql.functions.size(newDF.col("flatitem.errorCodes")).equalTo(0), "N").otherwise("Y")).as("someFlag"),
org.apache.spark.sql.functions.concat_ws("|",
newDF.col("a"),
newDF.col("b"),
newDF.col("c"),
newDF.col("d"))
.as("abcd")
);
finalRequiredFormatDF.printSchema();
try {
itemsConfig = HBaseConfiguration.create();
System.out.println("HBase is running! created configuration for xyz table");
}
catch (Exception ce){
ce.printStackTrace();
}
itemsConfig.set(TableInputFormat.INPUT_TABLE, "/xyztable");
itemsConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "m"); // column family
itemsConfig.set(TableInputFormat.SCAN_COLUMNS, "m:iUUID"); // column qualifiers
JavaPairRDD<ImmutableBytesWritable, Result> itemsData = sc.newAPIHadoopRDD(itemsConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item> itemsFunc = new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item>() {
private static final long serialVersionUID = -9049583931936132447L;
public Tuple2<String,Item> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
Item item = new Item();
String iUUID = Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("m"), Bytes.toBytes("iUUID")));
String rowKey = Bytes.toString(immutableBytesWritableResultTuple2._1.get());
item.setRowKey(rowKey);
item.setiUUID(iUUID);
return new Tuple2<String, Item>(rowKey,item);
}
};
JavaPairRDD<String,Item> itemsPairRdd = itemsData.mapToPair(itemsFunc);
DataFrame itemsDf = sqlContext.createDataFrame(itemsPairRdd.values(), Item.class);
itemsDf.printSchema();
try {
orgConfig = HBaseConfiguration.create();
System.out.println("HBase is running! created configuration for outlets table");
}
catch (Exception ce){
ce.printStackTrace();
}
orgConfig.set(TableInputFormat.INPUT_TABLE, "/someZtable");
orgConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "z"); // column family
orgConfig.set(TableInputFormat.SCAN_COLUMNS, "z:oUUID"); // column qualifiers
JavaPairRDD<ImmutableBytesWritable, Result> orgData = sc.newAPIHadoopRDD(orgConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org> orgFunc = new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org>() {
private static final long serialVersionUID = -9049583931936132447L;
public Tuple2<String,Org> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
Org org = new Org();
String oUUID = Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("z"), Bytes.toBytes("oUUID")));
String rowKeyOrg = Bytes.toString(immutableBytesWritableResultTuple2._1.get());
org.setRowKeyOrg(rowKeyOrg);
org.setoUUID(oUUID);
return new Tuple2<String, Org>(rowKeyOrg,org);
}
};
JavaPairRDD<String,Org> orgPairRdd = orgData.mapToPair(orgFunc);
DataFrame orgDf = sqlContext.createDataFrame(orgPairRdd.values(), Org.class);
orgDf.printSchema();
DataFrame joinedDF=finalRequiredFormatDF.join(org.apache.spark.sql.functions.broadcast(itemsDf), finalRequiredFormatDF.col("item_id").equalTo(itemsDf.col("rowKey")),"LEFT").join(org.apache.spark.sql.functions.broadcast(orgDf),finalRequiredFormatDF.col("outlet_id").equalTo(orgDf.col("rowKeyOrg")));
//joinedDF.printSchema();
joinedDF.select(
)
.write().mode(SaveMode.Overwrite).saveAsTable("schemaName.targetHiveTableName");
//partition is not always supported by Hive
//.write().mode(SaveMode.Overwrite).partitionBy("transaction_date").saveAsTable("schemaName.targetHiveTableName");
//if you want to still partition write as a parquet file and create Hive External table (some of the partition functions are not supported by Hive), or create a view using Apache Drill on top of the parquet file
//.write().partitionBy("transaction_date").parquet("/mapr/airavata/dev/nextgen/rjr/transaction_line_items_full_partitioned_by_Date_equi_join.parquet");
}
finally
{
sc.close();
}
System.out.println("Hive Table created");
}
}
hadoop_classpath=$(hadoop classpath)
HBASE_CLASSPATH=$(hbase classpath)
sudo -u userName /spark-1.5.2/bin/spark-submit --name jobName --class packagename.SparkCreateSomeJoinedTableInHiveFromHbase --master yarn --deploy-mode client --num-executors 8 --total-executor-cores 32 --executor-memory 2G --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}" --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}" --conf "spark.executor.extraClassPath=${hadoop_classpath}" --jars "/path/projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar" /path/projectName-0.0.1-SNAPSHOT.jar
No comments:
Post a Comment