https://stackoverflow.com/questions/44768552/want-to-use-the-same-host-and-port-for-ssh-session
Anand's blog
Thursday, September 14, 2017
Wednesday, August 24, 2016
How to describe a hbase table with Column family and column qualifiers
package m7example;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
/*
* command to run the program
java -cp m7example-0.0.1-jar-with-dependencies.jar:`hbase classpath` m7example.RetrieveData2 <tableName>
*/
public class RetrieveData2{
public static void main(String[] args) throws IOException, Exception{
String tableName=null;
if(args.length>0)
{
if(args[0] != null)
{
System.out.println("Table Name is "+args[0]);
tableName=args[0];
}
}
else
{
System.out.println("Please input table name (with full path) as argument");
System.exit(0);
}
// Instantiating Configuration class
Configuration config = HBaseConfiguration.create();
// Instantiating HTable class
//HTable table = new HTable(config, "/path/students");
HTable table = new HTable(config,tableName);
// Instantiating configuration class
Configuration conf = HBaseConfiguration.create();
// Instantiating HBaseAdmin class
HBaseAdmin admin = new HBaseAdmin(conf);
// Verifying the existance of the table
boolean bool = admin.tableExists(tableName);
if(!bool)
{
System.out.println(tableName+" does n't exist");
System.exit(0);
}
Scan scan = new Scan();
//Scan scan = new Scan(Bytes.toBytes("000"),Bytes.toBytes("001")); //use start-row/stop-row if you want to limit the full table scan
ResultScanner scanner = table.getScanner(scan);
ArrayList<String> cf =new ArrayList<String>();
System.out.println("Please be patient, I am doing a full table scan");
System.out.print("=");
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
System.out.print(".");
//String rowKey= Bytes.toString(rr.getRow());
Iterator<KeyValue> iter = rr.list().iterator();
while (iter.hasNext()) {
KeyValue kv = iter.next();
if(!cf.contains(Bytes.toString(kv.getFamily())))
{
cf.add(Bytes.toString(kv.getFamily()));
System.out.print("=");
}
}//while
}
ArrayList<String> qf[] = new ArrayList[cf.size()]; //Put the length of the array you need
for(int x = 0; x < qf.length; x++){
qf[x] = new ArrayList<>();
}
scan = new Scan();
scanner = table.getScanner(scan);
int index=0;
System.out.println("Scanning qualifiers of Column family ");
System.out.print("=");
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
index=0;
for(String y:cf)
{
//System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
//System.out.println("Scanning qualifiers of Column family " + y);
//System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
//index=cf.indexOf(y);
for (String p: getColumnsOfCF(rr,y))
{
if(!qf[index].contains(p))
{
qf[index].add(p);
System.out.print("=");
}
}
index++;
}
}
System.out.println("ColumnFamily and ColumnQualifiers of "+tableName);
System.out.println("============================================================================");
index=0;
for(index=0;index<cf.size();index++)
{
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
System.out.println(cf.get(index));
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
for(String x:qf[index])
{
System.out.println(cf.get(index)+":"+x);
}
}
table.close();
}
public static String[] getColumnsOfCF(Result r, String ColumnFamily)
{
NavigableMap<byte[], byte[]> familyMap = r.getFamilyMap(Bytes.toBytes(ColumnFamily));
String[] qfs = new String[familyMap.size()];
int counter = 0;
for(byte[] bytqfs : familyMap.keySet())
{
qfs [counter++] = Bytes.toString(bytqfs );
}
return qfs ;
}
}
Pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mapr.support</groupId>
<artifactId>m7example</artifactId>
<version>0.0.1</version>
<repositories>
<repository>
<id>mapr-maven</id>
<url>http://repository.mapr.com/maven</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.13-mapr-1401-m7-3.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>5</source>
<target>5</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>m7example.M7Demo</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Shell script to call the java program
hbase-describe-All.sh
~~~~~~~~~~~~~~~~
#!/bin/bash
ARGS_EXPECTED=1
if [ $# -ne $ARGS_EXPECTED ]
then
echo "[$HOSTNAME]: Usage: `basename $0` table_name(with full path)"
exit 1
fi
java -cp /path-to-jar/m7example-0.0.1-jar-with-dependencies.jar:`hbase classpath` m7example.RetrieveData2 $1
Labels:
column families,
column qualifiers,
describe,
describeall,
Hbase,
java api,
java program,
mapr-DB
Wednesday, July 20, 2016
How to Create a Kafka simple Producer (Java)
package packagename;
import java.io.IOException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);
TestProducer(long events, String topic, String fileID_gen, String a_broker) throws IOException {
//sfileID_gen="random";
Properties props = new Properties();
//props.put("metadata.broker.list", "localhost:9092");
//props.put("metadata.broker.list", "localhost:9092");
LOG.info("setting metadata.broker.list ="+a_broker);
props.put("metadata.broker.list", a_broker);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "packagename.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
String msg="";
int index=0;
for (long nEvents = 0; nEvents < events; nEvents++) {
if (fileID_gen.equals("random"))
{
msg = "\""+randomChar()+"\""+ randomLorT()+"\",\"numRecords\":165848,\"hasErrors\":" + randomBoolean() + "}";
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg.trim());
LOG.info("Generated data is " + msg);
LOG.info("waiting to generate file details");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
LOG.error("thread sleep error",e);
}
producer.send(data);
}// for-close
producer.close();
}// constructor-close
public boolean randomBoolean() {
return Math.random() < 0.5;
}
public String randomChar() {
Random r = new Random();
char random_char = (char) (r.nextInt(26) + 'a');
//Character.toString(random_char);
return Character.toString(random_char);
}
public String randomLorT()
{
final String alphabet = "T";
final int N = alphabet.length();
char random_char = 0;
Random r = new Random();
for (int i = 0; i <= N ; i++) {
random_char=alphabet.charAt(r.nextInt(N));
}
return Character.toString(random_char);
}
}// TestProducer-class-close
package packagename;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}
}
Dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
import java.io.IOException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);
TestProducer(long events, String topic, String fileID_gen, String a_broker) throws IOException {
//sfileID_gen="random";
Properties props = new Properties();
//props.put("metadata.broker.list", "localhost:9092");
//props.put("metadata.broker.list", "localhost:9092");
LOG.info("setting metadata.broker.list ="+a_broker);
props.put("metadata.broker.list", a_broker);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "packagename.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
String msg="";
int index=0;
for (long nEvents = 0; nEvents < events; nEvents++) {
if (fileID_gen.equals("random"))
{
msg = "\""+randomChar()+"\""+ randomLorT()+"\",\"numRecords\":165848,\"hasErrors\":" + randomBoolean() + "}";
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg.trim());
LOG.info("Generated data is " + msg);
LOG.info("waiting to generate file details");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
LOG.error("thread sleep error",e);
}
producer.send(data);
}// for-close
producer.close();
}// constructor-close
public boolean randomBoolean() {
return Math.random() < 0.5;
}
public String randomChar() {
Random r = new Random();
char random_char = (char) (r.nextInt(26) + 'a');
//Character.toString(random_char);
return Character.toString(random_char);
}
public String randomLorT()
{
final String alphabet = "T";
final int N = alphabet.length();
char random_char = 0;
Random r = new Random();
for (int i = 0; i <= N ; i++) {
random_char=alphabet.charAt(r.nextInt(N));
}
return Character.toString(random_char);
}
}// TestProducer-class-close
SimplePartitioner Class
package packagename;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}
}
Dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
Apache Drill JDBC Example
package packagename;
import java.sql.*;
public class DrillJDBCExample1 {
static final String JDBC_DRIVER = "org.apache.drill.jdbc.Driver";
static final String DB_URL = "jdbc:drill:zk=machineName:5181/drill/x-drillbits";
//You can get this URL from drill explorer, if you have installed
//Zookeeper port is 5181 in MapR hadoop
//static final String USER = "admin";
//static final String PASS = "admin";
public static void main(String[] args) {
Connection conn = null;
Statement stmt = null;
try{
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL," "," ");
stmt = conn.createStatement();
/* Perform a select on data in the classpath storage plugin. */
String sql = "select transaction_week_end,SUM(SumOfSale) as sq,SUM(SumOfPrice) as sp from dfs.`StoragepluginName`.`drillTableName` GROUP BY `transaction_week_end`";
ResultSet rs = stmt.executeQuery(sql);
while(rs.next()) {
System.out.print(rs.getBigDecimal("sq")+"\t");
System.out.print(rs.getBigDecimal("sp")+"\n");
}
rs.close();
stmt.close();
conn.close();
} catch(SQLException se) {
//Handle errors for JDBC
se.printStackTrace();
} catch(Exception e) {
//Handle errors for Class.forName
e.printStackTrace();
} finally {
try{
if(stmt!=null)
stmt.close();
} catch(SQLException se2) {
}
try {
if(conn!=null)
conn.close();
} catch(SQLException se) {
se.printStackTrace();
}
}
}
}
How to Package a Maven Project and ftp the jar files to another server, from Eclipse in single step (from Windows Machine)
In most of the scenarios, I have worked (in Hadoop Project or Spark Projects.)
I end up developing my code (mostly Jave) in a Windows machine, (with my dependencies jars added - Maven Project) and ftping the jars to Hadoop or Spark cluster.
I found an easy step to package and ftp in single step
Write a build.bat
call "c:\Program Files\Apache\Maven\bin\mvn" package
echo open sftp://UserName:Passwd@<ServerName-where-you-need-Jar> -hostkey="server's hostkey" >> ftpcmd.dat
echo cd /targer/path >> ftpcmd.dat
echo put "c:\workspace\ProjectName\target\project-0.0.1-SNAPSHOT.jar" >> ftpcmd.dat
echo put "c:\workspace\ProjectName\target\project-0.0.1-SNAPSHOT-jar-with-dependencies.jar" >> ftpcmd.dat
echo exit >> ftpcmd.dat
winscp.com /script=ftpcmd.dat
del ftpcmd.dat
Click Run -> External Tools (bottom most menu)-> External Tool Configuration
or
Every time you can click on this config from your Run menu, to package and ftp in single step
Tip: I ftp the fat Jar(with dependencies) only one time, and put a REM (REMARK - commenting a line in batch file) in the dependencies ftp line.
Also commet out the build plugin in pom.xml
Then I rename the class, (so It won't conflict with the class from the fat jar) and ftp only the jar (with out dependencies)
I pass both the jars to the program, so I can avoid complete package and ftping time. and quickly I can check the changes in cluster and develop faster :)
<!--
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>mainClass Name</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
-->
Labels:
automatically ftp,
build batch script,
eclipse,
fat jar,
ftp,
jar,
jar file,
maven
Partition a parquet file using Apache Spark
SparkConf spconf = new SparkConf();
spconf.set("spark.driver.maxResultSize", "3g");
JavaSparkContext sc=null;
try
{
sc = new JavaSparkContext(spconf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
//HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
DataFrame df = sqlContext.read().parquet("/path/hive/warehouse/nextgen_rjr.db/source.parquet");
df.printSchema();
//df.write().partitionBy("col_A","col_B").parquet("/targetPath/source_partitioned.parquet");
df.write().partitionBy("col_A").parquet("/targetPath/source_partitioned_by_single_col.parquet");
}
finally
{
sc.close();
}
System.out.println("Partitioned Parquet File created");
Apache Spark: read from Hbase table and process the data and create Hive Table directly
package packagename;
import java.io.IOException;
import java.io.Serializable;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import scala.Tuple2;
public class SparkCreateSomeJoinedTableInHiveFromHbase {
public static class Item implements Serializable {
private static final long serialVersionUID = 3926135063167745103L;
private String rowKey;
private String iUUID;
public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getiUUID() {
return iUUID;
}
public void setiUUID(String iUUID) {
this.iUUID = iUUID;
}
}
public static class Org implements Serializable {
private static final long serialVersionUID = -7380406345952297820L;
private String rowKeyOrg;
private String oUUID;
public String getRowKeyOrg() {
return rowKeyOrg;
}
public void setRowKeyOrg(String rowKeyOrg) {
this.rowKeyOrg = rowKeyOrg;
}
public String getoUUID() {
return oUUID;
}
public void setoUUID(String oUUID) {
this.oUUID = oUUID;
}
}
public static void main(String[] args) throws IOException {
SparkConf spconf = new SparkConf();
spconf.set("spark.driver.maxResultSize", "3g");
JavaSparkContext sc=null;
Configuration itemsConfig = null;
Configuration orgConfig = null;
try
{
sc = new JavaSparkContext(spconf);
Configuration config = null;
try {
config = HBaseConfiguration.create();
System.out.println("HBase is running!");
}
catch (Exception ce){
ce.printStackTrace();
}
config.set(TableInputFormat.INPUT_TABLE, "/sourcetablePath/tableName");
config.set(TableInputFormat.SCAN_COLUMN_FAMILY, "x"); // column family
config.set(TableInputFormat.SCAN_COLUMNS, "x:y"); // 3 column qualifiers
//config.set(TableInputFormat.SCAN_ROW_START, "M|");
//config.set(TableInputFormat.SCAN_ROW_STOP, "M|~"); //RowKeyPattern
JavaPairRDD<ImmutableBytesWritable, Result> data = sc.newAPIHadoopRDD(config, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String> pairFunc =
new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String>() {
private static final long serialVersionUID = -9049583931936132447L;
public Tuple2<String, String> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
String m = Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("x"), Bytes.toBytes("y")));
String rowKey = Bytes.toString(immutableBytesWritableResultTuple2._1.get());
if (m != null && rowKey != null) {
return new Tuple2<String, String>(rowKey,m);
}
else {
return null;
}
}
};
JavaPairRDD<String, String> pairRdd = data.mapToPair(pairFunc);
// SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
sqlContext.setConf("hive.exec.dynamic.partition", "true");
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict");
DataFrame df = sqlContext.read().json(pairRdd.values());
DataFrame newDF = df.select(df.col("A").as("apple"),
,
//explode JSON org.apache.spark.sql.functions.explode(df.col("JSONString")).as("flatitem"));
DataFrame finalRequiredFormatDF= newDF.select(
newDF.col("somecol"),
newDF.col("someDate").cast(DataTypes.DateType).as("y_date"),
(org.apache.spark.sql.functions.when(org.apache.spark.sql.functions.size(newDF.col("flatitem.errorCodes")).equalTo(0), "N").otherwise("Y")).as("someFlag"),
org.apache.spark.sql.functions.concat_ws("|",
newDF.col("a"),
newDF.col("b"),
newDF.col("c"),
newDF.col("d"))
.as("abcd")
);
finalRequiredFormatDF.printSchema();
try {
itemsConfig = HBaseConfiguration.create();
System.out.println("HBase is running! created configuration for xyz table");
}
catch (Exception ce){
ce.printStackTrace();
}
itemsConfig.set(TableInputFormat.INPUT_TABLE, "/xyztable");
itemsConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "m"); // column family
itemsConfig.set(TableInputFormat.SCAN_COLUMNS, "m:iUUID"); // column qualifiers
JavaPairRDD<ImmutableBytesWritable, Result> itemsData = sc.newAPIHadoopRDD(itemsConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item> itemsFunc = new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Item>() {
private static final long serialVersionUID = -9049583931936132447L;
public Tuple2<String,Item> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
Item item = new Item();
String iUUID = Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("m"), Bytes.toBytes("iUUID")));
String rowKey = Bytes.toString(immutableBytesWritableResultTuple2._1.get());
item.setRowKey(rowKey);
item.setiUUID(iUUID);
return new Tuple2<String, Item>(rowKey,item);
}
};
JavaPairRDD<String,Item> itemsPairRdd = itemsData.mapToPair(itemsFunc);
DataFrame itemsDf = sqlContext.createDataFrame(itemsPairRdd.values(), Item.class);
itemsDf.printSchema();
try {
orgConfig = HBaseConfiguration.create();
System.out.println("HBase is running! created configuration for outlets table");
}
catch (Exception ce){
ce.printStackTrace();
}
orgConfig.set(TableInputFormat.INPUT_TABLE, "/someZtable");
orgConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "z"); // column family
orgConfig.set(TableInputFormat.SCAN_COLUMNS, "z:oUUID"); // column qualifiers
JavaPairRDD<ImmutableBytesWritable, Result> orgData = sc.newAPIHadoopRDD(orgConfig, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org> orgFunc = new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String,Org>() {
private static final long serialVersionUID = -9049583931936132447L;
public Tuple2<String,Org> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
Org org = new Org();
String oUUID = Bytes.toString(immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("z"), Bytes.toBytes("oUUID")));
String rowKeyOrg = Bytes.toString(immutableBytesWritableResultTuple2._1.get());
org.setRowKeyOrg(rowKeyOrg);
org.setoUUID(oUUID);
return new Tuple2<String, Org>(rowKeyOrg,org);
}
};
JavaPairRDD<String,Org> orgPairRdd = orgData.mapToPair(orgFunc);
DataFrame orgDf = sqlContext.createDataFrame(orgPairRdd.values(), Org.class);
orgDf.printSchema();
DataFrame joinedDF=finalRequiredFormatDF.join(org.apache.spark.sql.functions.broadcast(itemsDf), finalRequiredFormatDF.col("item_id").equalTo(itemsDf.col("rowKey")),"LEFT").join(org.apache.spark.sql.functions.broadcast(orgDf),finalRequiredFormatDF.col("outlet_id").equalTo(orgDf.col("rowKeyOrg")));
//joinedDF.printSchema();
joinedDF.select(
)
.write().mode(SaveMode.Overwrite).saveAsTable("schemaName.targetHiveTableName");
//partition is not always supported by Hive
//.write().mode(SaveMode.Overwrite).partitionBy("transaction_date").saveAsTable("schemaName.targetHiveTableName");
//if you want to still partition write as a parquet file and create Hive External table (some of the partition functions are not supported by Hive), or create a view using Apache Drill on top of the parquet file
//.write().partitionBy("transaction_date").parquet("/mapr/airavata/dev/nextgen/rjr/transaction_line_items_full_partitioned_by_Date_equi_join.parquet");
}
finally
{
sc.close();
}
System.out.println("Hive Table created");
}
}
hadoop_classpath=$(hadoop classpath)
HBASE_CLASSPATH=$(hbase classpath)
sudo -u userName /spark-1.5.2/bin/spark-submit --name jobName --class packagename.SparkCreateSomeJoinedTableInHiveFromHbase --master yarn --deploy-mode client --num-executors 8 --total-executor-cores 32 --executor-memory 2G --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}" --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}" --conf "spark.executor.extraClassPath=${hadoop_classpath}" --jars "/path/projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar" /path/projectName-0.0.1-SNAPSHOT.jar
Subscribe to:
Posts (Atom)