Memory Management in Spark and its tuning

Spark has two kinds of memory- 1.Execution Memory which is used to store temporary data of shuffles, joins, sorts, and aggregations 2. Storage Memory     which is used to cache RDDs and data frames Executor has some amount of total memory, which is divided into two parts, the execution block and the storage block.This is governed by two configuration options. 1. spark.executor.memory > It is the total amount of memory which is available to executors. It is 1 gigabyte by default 2. spark.memory.fraction > Fraction of the total memory available for execution and storage. In early version of Spark, these two kinds of memory were fixed. And if your job was to fill all the execution space, Spark had to spill data to disk, reducing performance of the application. On the other hand, if your...

How to flatten JSON in Spark Dataframe

How to flatten whole JSON containing ArrayType and StructType in it? In order to flatten a JSON completely we don’t have any predefined function in Spark. We can write our own function that will flatten out JSON completely. We will write a function that will accept DataFrame. For each field in the DataFrame we will get the DataType. If the field is of ArrayType we will create new column with exploding the ArrayColumn using Spark explode_outer function. If the field is of StructType we will create new column with parentfield_childfield for each field in the StructType Field. This is a recursive function. Once the function doesn’t find any ArrayType or StructType. It will return the flattened DataFrame. Otherwise, It will it iterate through the schema to completely flatten out the JSON...

How to create Spark Dataframe on HBase table[Code Snippets]

There is no direct library to create Dataframe on HBase table like how we read Hive table with Spark sql. This post gives the way to create dataframe on top of Hbase table. You need to add hbase-client dependency to achieve this. Below is the link to get the dependency. Lets say the hbase table is ’emp’ with rowKey as ’empID’ and columns are ‘name’ and ‘city’ under the column-family named – ‘metadata’. Case class -EmpRow is used in order to give the structure to the dataframe. newAPIHadoopRDD is the API available in Spark to create RDD on hbase, configurations need to passed as shown below. Dataframe will be created when you parse this RDD on case class. ...

How to Add Serial Number to Spark Dataframe

You may required to add Serial number to Spark Dataframe sometimes. It can be done with the spark function called monotonically_increasing_id(). It generates a new column with unique 64-bit monotonic index for each row. But it isn’t significant, as the sequence changes based on the partition. In short,  random numbers will be assigned which are out of sequence. If the goal is add serial number to the dataframe, you can use zipWithIndex method available on RDD. below is how you can achieve the same on dataframe. Credits: stackoverflow  

Deep dive into Partitioning in Spark – Hash Partitioning and Range Partitioning

Partitions- The data within an RDD is split into several partitions. Properties of partitions: – Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine. – Each machine in the cluster contains one or more partitions. – The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes. Two kinds of partitioning available in Spark: – Hash partitioning – Range partitioning Customizing a partitioning is only possible on Pair RDDs. Hash partitioning- Given a Pair RDD that should be grouped: val purchasesPerCust = -> (p.customerId, p.price)) // Pair RDD .groupByKey() groupByKey first computes per tuple (k, v) its partition p: p = k....

Spark runtime Architecture – How Spark Jobs are executed

How Spark Jobs are Executed- A Spark application is a set of processes running on a cluster. All these processes are coordinated by the driver program. The driver is: -the process where the main() method of your program run. -the process running the code that creates a SparkContext, creates RDDs, and stages up or sends off transformations and actions. These processes that run computations and store data for your application are executors. Executors: -Run the tasks that represent the application. -Return computed results to the driver. -Provide in-memory storage for cached RDDs. Execution of a Spark program: 1. The driver program runs the Spark application, which creates a SparkContext upon start-up. 2. The SparkContext connects to a cluster manager (e.g., Mesos/YARN) which allocates resour...

How to write Current method name to log in Scala[Code Snippet]

You will be having many methods in your application framework, and if want to trace and log current method name then the below code will be helpful for you. def getCurrentMethodName:String = Thread.currentThread.getStackTrace()(2).getMethodName def test{ println("you are in - "+getCurrentMethodName) println("this is doing some functionality") } test Output: you are in – test this is doing some functionality

How to Calculate total time taken for particular method in Spark[Code Snippet]

In some cases where you applied Joins in the spark application, you might want to know the time taken to complete the particular join. Below code snippet might come in handy to achieve so. import java.util.Date val curent = new Date().getTime println(curent) Thread.sleep(30000) val end = new Date().getTime println(end) println("time taken "+(end-curent).toFloat/60000 + "mins") Output: import java.util.Date curent: Long = 1520502573995 end: Long = 1520502603996 time taken 0.5000167mins All you need to do is get current time before method starts and get current time after method ends, then calculate the difference to get total time taken to complete that particular method. Hope this code snippet helps!!

How to write current date timestamp to log file in Scala[Code Snippet]

Scala doesn’t have its own library for Dates and timestamps, so we need to depend on Java libraries. Here is the quick method to get current datetimestamp and format it as per your required format. Please note that all the code syntaxes are in Scala, this can be used while writing Scala application. import java.sql.Timestamp def getCurrentdateTimeStamp: Timestamp ={ val today:java.util.Date = Calendar.getInstance.getTime val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val now:String = timeFormat.format(today) val re = java.sql.Timestamp.valueOf(now) re } import java.sql.Timestamp getCurrentdateTimeStamp: java.sql.Timestamp getCurrentdateTimeStamp res0: java.sql.Timestamp = 2018-03-18 07:48:00.0

Common issues with Apache Spark

Tricky Deployment: Once you’re done writing your app, you have to deploy it right? That’s where things get a little out of hand. Although there are many options for deploying your Spark app, the simplest and straightforward approach is standalone deployment. Spark supports Mesos and Yarn, so if you’re not familiar with one of those it can become quite difficult to understand what’s going on. You might face some initial hiccups when bundling dependencies as well. If you don’t do it correctly, the Spark app will work in standalone mode but you’ll encounter Class path exceptions when running in cluster mode. Memory Issues: As Apache Spark is built to process huge chunks of data, monitoring and measuring memory usage is critical. While Spark works just fine for normal usage, it has got tons of...

Comparison between Apache Spark and Apache Hadoop

Spark Hadoop Comparison: The below the comparison between spark and Hadoop. They do different: Hadoop and Apache Spark are both big-data frameworks, but they don’t really serve the same purposes. Hadoop is essentially a distributed data infrastructure: It distributes massive data collections across multiple nodes within a cluster of commodity servers, which means you don’t need to buy and maintain expensive custom hardware. It also indexes and keeps track of that data, enabling big-data processing and analytics far more effectively than was possible previously. Spark, on the other hand, is a data-processing tool that operates on those distributed data collections; it doesn’t do distributed storage. You can use one without the other: Hadoop includes not just a storage comp...

Version wise features of Apache Spark

Spark Release 2.1.0: Apache Spark 2.1.0 release makes significant strides in the production readiness of Structured Streaming, with added support for event time watermarks and Kafka 0.10 support. In addition, this release focuses more on usability, stability, and polish, resolving over 1200 tickets.  The below is the list of high level changes Core and Spark SQL: This version supports from json and to json for parsing jsonfor string columns. This version allows for the use of DDL commands to manipulate partitions for tables stored with Spark’s native formats. It Speeds up group-by aggregate performance by adding a fast aggregation cache that is backed by a row-based hashmap. Structured Streaming: This version gives Kafka 0.10 support in Structured Streaming. This version Support all file f...

Lost Password