package com.packagename;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class SparkReadFromPostgres {
public static void main(String[] args) {
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql://servername:5432/dbname");
options.put("user", "username");
options.put("password", "<somePassword>");
options.put("driverClassName", "org.postgresql.Driver");
options.put("dbtable", "schema.tableName");
SparkConf spconf = new SparkConf();
spconf.set("spark.driver.maxResultSize", "3g");
JavaSparkContext sc=null;
try
{
sc = new JavaSparkContext(spconf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame jdbcDF = sqlContext.read().format("jdbc").options(options).load();
jdbcDF.printSchema();
jdbcDF.show(10);
//if you want to create the table again in Postgres
createPostGresTable(jdbcDF,"testTable");
}
finally
{
sc.close();
}
}
public static void createPostGresTable(DataFrame output, String postgresTableName)
{
String url = "jdbc:postgresql://serverName:5432/dbName";
Properties props = new Properties();
props.setProperty("user","userName");
props.setProperty("password","password");
//props.setProperty("ssl","true");
props.setProperty("driverClassName","org.postgresql.Driver");
//Connection conn = DriverManager.getConnection(url, props);
//String postgresTable="TableName";
output.write().mode("overwrite").jdbc(url, postgresTableName, props);
}
}
Command to Run the Spark Program
hadoop_classpath=$(hadoop classpath)
HBASE_CLASSPATH=$(hbase classpath)
sudo -u userId /spark/spark-1.5.2/bin/spark-submit --name SparkReadFromPostgres --class com.packagename.SparkReadFromPostgres --master yarn --deploy-mode client --num-executors 8 --executor-cores 8 --executor-memory 4G --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}" --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}" --conf "spark.executor.extraClassPath=${hadoop_classpath}" --conf "spark.executor.extraClassPath=/sharedpath/postgresql-9.4.1208.jar" --conf "spark.driver.extraClassPath=/sharedpath/postgresql-9.4.1208.jar" --jars /projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar /projectName-0.0.1-SNAPSHOT.jar
pom.xml dependencies:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.12-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>0.98.12-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-examples</artifactId>
<version>0.98.12-hadoop2</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4.1208</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.11</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.10</artifactId>
<version>1.5.2</version>
</dependency>
</dependencies>
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class SparkReadFromPostgres {
public static void main(String[] args) {
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql://servername:5432/dbname");
options.put("user", "username");
options.put("password", "<somePassword>");
options.put("driverClassName", "org.postgresql.Driver");
options.put("dbtable", "schema.tableName");
SparkConf spconf = new SparkConf();
spconf.set("spark.driver.maxResultSize", "3g");
JavaSparkContext sc=null;
try
{
sc = new JavaSparkContext(spconf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame jdbcDF = sqlContext.read().format("jdbc").options(options).load();
jdbcDF.printSchema();
jdbcDF.show(10);
//if you want to create the table again in Postgres
createPostGresTable(jdbcDF,"testTable");
}
finally
{
sc.close();
}
}
public static void createPostGresTable(DataFrame output, String postgresTableName)
{
String url = "jdbc:postgresql://serverName:5432/dbName";
Properties props = new Properties();
props.setProperty("user","userName");
props.setProperty("password","password");
//props.setProperty("ssl","true");
props.setProperty("driverClassName","org.postgresql.Driver");
//Connection conn = DriverManager.getConnection(url, props);
//String postgresTable="TableName";
output.write().mode("overwrite").jdbc(url, postgresTableName, props);
}
}
Command to Run the Spark Program
hadoop_classpath=$(hadoop classpath)
HBASE_CLASSPATH=$(hbase classpath)
sudo -u userId /spark/spark-1.5.2/bin/spark-submit --name SparkReadFromPostgres --class com.packagename.SparkReadFromPostgres --master yarn --deploy-mode client --num-executors 8 --executor-cores 8 --executor-memory 4G --conf "spark.executor.extraClassPath=${HBASE_CLASSPATH}" --conf "spark.driver.extraClassPath=${HBASE_CLASSPATH}" --conf "spark.executor.extraClassPath=${hadoop_classpath}" --conf "spark.executor.extraClassPath=/sharedpath/postgresql-9.4.1208.jar" --conf "spark.driver.extraClassPath=/sharedpath/postgresql-9.4.1208.jar" --jars /projectName-0.0.1-SNAPSHOT-jar-with-dependencies.jar /projectName-0.0.1-SNAPSHOT.jar
pom.xml dependencies:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.12-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>0.98.12-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-examples</artifactId>
<version>0.98.12-hadoop2</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4.1208</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.11</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.10</artifactId>
<version>1.5.2</version>
</dependency>
</dependencies>
No comments:
Post a Comment