Apache Spark

Memory Management in Spark and its tuning

Spark has two kinds of memory- 1.Execution Memory which is used to store temporary data of shuffles, joins, sorts, and aggregations 2. Storage Memory     which is used to cache RDDs and data frames Executor has some amount of total memory, which is divided into two parts, the execution block and the storage block.This is governed by two configuration options. 1. spark.executor.memory > It is the total amount of memory which is available to executors. It is 1 gigabyte by default 2. spark.memory.fraction > Fraction of the total memory available for execution and storage. In early version of Spark, these two kinds of memory were fixed. And if your job was to fill all the execution space, Spark had to spill data to disk, reducing performance of the application. On the other hand, if your...

How to flatten JSON in Spark Dataframe

How to flatten whole JSON containing ArrayType and StructType in it? In order to flatten a JSON completely we don’t have any predefined function in Spark. We can write our own function that will flatten out JSON completely. We will write a function that will accept DataFrame. For each field in the DataFrame we will get the DataType. If the field is of ArrayType we will create new column with exploding the ArrayColumn using Spark explode_outer function. If the field is of StructType we will create new column with parentfield_childfield for each field in the StructType Field. This is a recursive function. Once the function doesn’t find any ArrayType or StructType. It will return the flattened DataFrame. Otherwise, It will it iterate through the schema to completely flatten out the JSON...

How to create Spark Dataframe on HBase table[Code Snippets]

There is no direct library to create Dataframe on HBase table like how we read Hive table with Spark sql. This post gives the way to create dataframe on top of Hbase table. You need to add hbase-client dependency to achieve this. Below is the link to get the dependency. https://mvnrepository.com/artifact/org.apache.hbase/hbase-client/2.1.0 Lets say the hbase table is ’emp’ with rowKey as ’empID’ and columns are ‘name’ and ‘city’ under the column-family named – ‘metadata’. Case class -EmpRow is used in order to give the structure to the dataframe. newAPIHadoopRDD is the API available in Spark to create RDD on hbase, configurations need to passed as shown below. Dataframe will be created when you parse this RDD on case class. ...

How to Add Serial Number to Spark Dataframe

You may required to add Serial number to Spark Dataframe sometimes. It can be done with the spark function called monotonically_increasing_id(). It generates a new column with unique 64-bit monotonic index for each row. But it isn’t significant, as the sequence changes based on the partition. In short,  random numbers will be assigned which are out of sequence. If the goal is add serial number to the dataframe, you can use zipWithIndex method available on RDD. below is how you can achieve the same on dataframe. Credits: stackoverflow  

How to filter DataFrame based on keys in Scala List using Spark UDF [Code Snippets]

There are some situations where you are required to Filter the Spark DataFrame based on the keys which are already available in Scala collection. Let’s see how we can achieve this in Spark. You need to use spark UDF for this – Step -1: Create a DataFrame using parallelize method by taking sample data. scala> val df = sc.parallelize(Seq((2,"a"),(3,"b"),(5,"c"))).toDF("id","name") df: org.apache.spark.sql.DataFrame = [id: int, name: string] Step -2: Create a UDF which concatenates columns inside dataframe. Below UDF accepts a collection of columns and returns concatenated column separated by the given delimiter. scala> val concatKey = udf( (xs: Seq[Any], sep:String) => xs.filter(_ != null).mkString(sep)) concatKey: org.apache.spark.sql.UserDefinedFunction = UserDefinedFu...

Ways to create DataFrame in Apache Spark [Examples with Code]

Ways to create DataFrame in Apache Spark – DATAFRAME is the representation of a matrix but we can have columns of different datatypes or similar table with different rows and having different types of columns (values of each column will be same data type). When working with Spark most of the times you are required to create Dataframe and play around with it. DATAFRAME is nothing but a data structure which is stored in memory and can be created by following ways – 1)Using Case Class 2)Using createDataFrame method 3)Using SQL method 4)Using read..load methods i) From flat files(JSON, CSV) ii) From RDBMS Databases 1)Using Case Class val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ case class Employee(name: String, sal: Int) Below is the sample...

Lost Password

Register