Ways to create DataFrame in Apache Spark –
DATAFRAME is the representation of a matrix but we can have columns of different datatypes or similar table with different rows and having different types of columns (values of each column will be same data type).
When working with Spark most of the times you are required to create Dataframe and play around with it. DATAFRAME is nothing but a data structure which is stored in memory and can be created by following ways –
1)Using Case Class
2)Using createDataFrame method
3)Using SQL method
4)Using read..load methods
i) From flat files(JSON, CSV)
ii) From RDBMS Databases
1)Using Case Class
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Employee(name: String, sal: Int)
Below is the sample data available in emp.txt:
Sai, 3000
Hari, 5000
Mahesh, 6000
val emp = sc.textFile(“/Users/sai/Desktop/Files/emp.txt”).
map(_.split(“,”)).map(p => Employee(p(0), p(1).trim.toInt)).toDF()
emp.registerTempTable(“employees”)
val empDF = sqlContext.sql(“SELECT name, sal FROM employees”)
empDF.map(t => “Name: ” + t(0)).collect().foreach(println)
empDF.map(t => “Name: ” + t.getAs[String](“name”)).collect().foreach(println)
empDF.map(_.getValuesMap[Any](List(“name”, “sal”))).collect().foreach(println)
2)Using createDataFrame method(Specifying the Schema)
val people = sc.textFile(“/Users/sai/Desktop/Files/emp.txt”)
val schemaString = “name,sal”
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val schema = StructType(schemaString.split(“,”).map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(“,”)).map(p => Row(p(0), p(1).trim))
val empDF = sqlContext.createDataFrame(rowRDD, schema)
empDF.show()
empDF.printSchema()
empDF.registerTempTable(“emp”)
val results = sqlContext.sql(“SELECT name FROM emp”)
3)Reading from Hive
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val hiveDF = hiveContext.sql(“select * from emp”)
4)Using read..load methods
Reading the JSON file and creating DataFrame:
val df = sqlContext.read.json(“/Users/sai/Desktop/Files/empjson.txt”)
df.show()
df.printSchema()
df.select(“name”).show()
df.select(df(“name”), df(“sal”) + 1).show()
df.filter(df(“sal”) > 3000).show()
df.groupBy(“sal”).count().show()
Reading CSV files with header and creating DataFrame:
./bin/spark-shell –packages com.databricks:spark-csv_2.11:1.1.0
val df = sqlContext.read.format(“com.databricks.spark.csv”).option(“header”, “true”).load(“/Users/sai/Desktop/Files/emp.txt”)
df.printSchema()
Note: From Spark 2.0 or later versions you no need to use databricks jar for reading csv files. API is already available for the same as below –
val df = sqlContext.read.option(“header”, “true”).csv(“/Users/sai/Desktop/Files/emp.txt”)
Creating DataFrame from RBMS Databases
MySQL-
(Required JAR to connnect to mysql : mysql-connector-java.jar)
val df_mysql = spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://localhost/db”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“dbtable”, “person”) //replace with own
.option(“user”, “root”) //replace with own
.option(“password”, “pwd”) // replace with own
.load()
DB2-
(Required JAR to connnect to DB2 : db2jcc4.jar)
val df_db2 = spark.read.format(“jdbc”)
.option(“url”, “jdbc:db2://localhost:50000/dbname”)
.option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
.option(“dbtable”, “person”) //replace with own
.option(“user”, “root”) //replace with own
.option(“password”, “pwd”) // replace with own
.load()