Spark

Handy Methods in SparkContext Object while writing Spark Applications

SparkContext is Main entry point for Spark functionality. Its basically a class in Spark framework, when initialized, gets access to Spark Libraries. A SparkContext is responsible for connecting to Spark cluster, and can be used to create RDD(Resilient Distributed Dataset), to broadcast variables on that cluster and has much more useful methods. To create or initialize Spark Context, SparkConf need to be created before hand. SparkConf is basically the class used to set some configurations for Spark Applications like setting Master, App Name etc. Creating SparkContext- from pyspark import SparkConf, SparkContext conf = SparkConf().set("master", "yarn") sc = SparkContext(conf=conf) In latest versions of Spark, sparkContext is available in SparkSession (Class in Spark SQL component/Main entry...

PySpark – Components

PySpark Core Components includes – Spark Core – All functionalities built on top of Spark Core. Contains classes like SparkContext, RDD Spark SQL – Gives API for structured data processing. Contains important classes like SparkSession, DataFrame, DataSet. Spark Streaming – Gives functionality for Streaming data processing using micro-batching technique. Contains classes like Streaming Context, DStream Spark ML – Provides API to implement Machine learning algorithms.

How to Retrieve Password from JCEKS file in Spark

In the data ingestion stage into Hadoop from RBDMS sources, it often requires password to hit source tables in RDBMS databases. Passing hard password directly is highly unsafe and bad practice in real time applications. So, password can be encrypted by creating JCEKS file. JCEKS is basically a keystore file saved in the Java Cryptography Extension KeyStore (JCEKS) format; used as an alternative keystore to the Java Keystore (JKS) format for the Java platform; stores encoded keys. When working on Spark application which deals with RDBMS sources JCEKS need to be decrypted to query the source tables. Below is the handy function to retrieve password from JCEKS file- Using PySpark Using Scala

Joins in Spark SQL- Shuffle Hash, Sort Merge, BroadCast

Apache Spark SQL component comes with catalyst optimizer which smartly optimizes the jobs by re-arranging the order of transformations and by implementing some special joins according to datasets. Spark performs these joins internally or you can force it to perform them. It’s worthwhile to know this topic, so that it comes to rescue when optimizing the jobs according to your use case. Shuffle Hash Join Shuffle hash join shuffles the data based on join keys and then perform the join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. It follows the classic map-reduce pattern: First ...

Memory Management in Spark and its tuning

Spark has two kinds of memory- 1.Execution Memory which is used to store temporary data of shuffles, joins, sorts, and aggregations 2. Storage Memory     which is used to cache RDDs and data frames Executor has some amount of total memory, which is divided into two parts, the execution block and the storage block.This is governed by two configuration options. 1. spark.executor.memory > It is the total amount of memory which is available to executors. It is 1 gigabyte by default 2. spark.memory.fraction > Fraction of the total memory available for execution and storage. In early version of Spark, these two kinds of memory were fixed. And if your job was to fill all the execution space, Spark had to spill data to disk, reducing performance of the application. On the other hand, if your...

How to flatten JSON in Spark Dataframe

How to flatten whole JSON containing ArrayType and StructType in it? In order to flatten a JSON completely we don’t have any predefined function in Spark. We can write our own function that will flatten out JSON completely. We will write a function that will accept DataFrame. For each field in the DataFrame we will get the DataType. If the field is of ArrayType we will create new column with exploding the ArrayColumn using Spark explode_outer function. If the field is of StructType we will create new column with parentfield_childfield for each field in the StructType Field. This is a recursive function. Once the function doesn’t find any ArrayType or StructType. It will return the flattened DataFrame. Otherwise, It will it iterate through the schema to completely flatten out the JSON...

How to create Spark Dataframe on HBase table[Code Snippets]

There is no direct library to create Dataframe on HBase table like how we read Hive table with Spark sql. This post gives the way to create dataframe on top of Hbase table. You need to add hbase-client dependency to achieve this. Below is the link to get the dependency. https://mvnrepository.com/artifact/org.apache.hbase/hbase-client/2.1.0 Lets say the hbase table is ’emp’ with rowKey as ’empID’ and columns are ‘name’ and ‘city’ under the column-family named – ‘metadata’. Case class -EmpRow is used in order to give the structure to the dataframe. newAPIHadoopRDD is the API available in Spark to create RDD on hbase, configurations need to passed as shown below. Dataframe will be created when you parse this RDD on case class. ...

How to Add Serial Number to Spark Dataframe

You may required to add Serial number to Spark Dataframe sometimes. It can be done with the spark function called monotonically_increasing_id(). It generates a new column with unique 64-bit monotonic index for each row. But it isn’t significant, as the sequence changes based on the partition. In short,  random numbers will be assigned which are out of sequence. If the goal is add serial number to the dataframe, you can use zipWithIndex method available on RDD. below is how you can achieve the same on dataframe. [code lang=”python”] from pyspark.sql.types import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rowId"): ”’ Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe and preserves a ...

Deep dive into Partitioning in Spark – Hash Partitioning and Range Partitioning

Partitions- The data within an RDD is split into several partitions. Properties of partitions: – Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine. – Each machine in the cluster contains one or more partitions. – The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes. Two kinds of partitioning available in Spark: – Hash partitioning – Range partitioning Customizing a partitioning is only possible on Pair RDDs. Hash partitioning- Given a Pair RDD that should be grouped: val purchasesPerCust = purchasesRdd.map(p -> (p.customerId, p.price)) // Pair RDD .groupByKey() groupByKey first computes per tuple (k, v) its partition p: p = k....

Spark runtime Architecture – How Spark Jobs are executed

How Spark Jobs are Executed- A Spark application is a set of processes running on a cluster. All these processes are coordinated by the driver program. The driver is: -the process where the main() method of your program run. -the process running the code that creates a SparkContext, creates RDDs, and stages up or sends off transformations and actions. These processes that run computations and store data for your application are executors. Executors: -Run the tasks that represent the application. -Return computed results to the driver. -Provide in-memory storage for cached RDDs. Execution of a Spark program: 1. The driver program runs the Spark application, which creates a SparkContext upon start-up. 2. The SparkContext connects to a cluster manager (e.g., Mesos/YARN) which allocates resour...

How to write Current method name to log in Scala[Code Snippet]

You will be having many methods in your application framework, and if want to trace and log current method name then the below code will be helpful for you. def getCurrentMethodName:String = Thread.currentThread.getStackTrace()(2).getMethodName def test{ println("you are in - "+getCurrentMethodName) println("this is doing some functionality") } test Output: you are in – test this is doing some functionality

How to Calculate total time taken for particular method in Spark[Code Snippet]

In some cases where you applied Joins in the spark application, you might want to know the time taken to complete the particular join. Below code snippet might come in handy to achieve so. import java.util.Date val curent = new Date().getTime println(curent) Thread.sleep(30000) val end = new Date().getTime println(end) println("time taken "+(end-curent).toFloat/60000 + "mins") Output: import java.util.Date curent: Long = 1520502573995 end: Long = 1520502603996 time taken 0.5000167mins All you need to do is get current time before method starts and get current time after method ends, then calculate the difference to get total time taken to complete that particular method. Hope this code snippet helps!!

Lost Password

Register

Do NOT follow this link or you will be banned from the site!