Wednesday, July 20, 2016

How to create Spark Dataframe from (Read) PostgreSql and write processed data frame to PostgreSql/MySql

package com.packagename;


import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class SparkReadFromPostgres {

public static void main(String[] args) {

Map<String, String> options = new HashMap<String, String>();

options.put("url", "jdbc:postgresql://servername:5432/dbname");
options.put("user", "username");
options.put("password", "<somePassword>");
options.put("driverClassName", "org.postgresql.Driver");
options.put("dbtable", "schema.tableName");

SparkConf spconf = new SparkConf();

spconf.set("spark.driver.maxResultSize", "3g");

JavaSparkContext sc=null;

try
{
sc = new JavaSparkContext(spconf);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame jdbcDF = sqlContext.read().format("jdbc").options(options).load();

jdbcDF.printSchema();

jdbcDF.show(10);


//if you want to create the table again in Postgres

createPostGresTable(jdbcDF,"testTable");

}
finally
{
sc.close();
}

}

public static void createPostGresTable(DataFrame output, String postgresTableName)
{
String url = "jdbc:postgresql://serverName:5432/dbName";
Properties props = new Properties();
props.setProperty("user","userName");
props.setProperty("password","password");
//props.setProperty("ssl","true");
props.setProperty("driverClassName","org.postgresql.Driver");

//Connection conn = DriverManager.getConnection(url, props);

//String postgresTable="TableName";

output.write().mode("overwrite").jdbc(url, postgresTableName, props);

}


}

Command to Run the Spark Program

hadoop_classpath=$(hadoop classpath)
HBASE_CLASSPATH=$(hbase classpath)


sudo -u userId  /spark/spark-1.5.2/bin/spark-submit   --name SparkReadFromPostgres     --class com.packagename.SparkReadFromPostgres   --master yarn   --deploy-mode client   --num-executors 8    --executor-cores 8    --executor-memory 4G   --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}"   --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}"    --conf "spark.executor.extraClassPath=${hadoop_classpath}"  --conf "spark.executor.extraClassPath=/sharedpath/postgresql-9.4.1208.jar"   --conf "spark.driver.extraClassPath=/sharedpath/postgresql-9.4.1208.jar"  --jars /projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar  /projectName-0.0.1-SNAPSHOT.jar

pom.xml dependencies:

<dependencies>

  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

 <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-common</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-examples</artifactId>
    <version>0.98.12-hadoop2</version>
</dependency>

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>9.4.1208</version>
</dependency>

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.11</artifactId>
    <version>1.2.0</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive-thriftserver_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

</dependencies>


No comments: