How to create Spark DataFrame from different sources

Creating DataFrame from Scala List or Sequence In some cases in order to test our business logic we need to have DataFrame and in most cases we would have created DataFrame from a sample file. Instead of doing that we can create a List of our sample data and we can convert it to DataFrame. Note : spark.implicits._ will be available in spark-shell by default. In case if we want to test in IDE we should import spark.implicits._ explicitly. From CSV Source From Parquet Source From Avro Source From JSON Source Using Spark StructType schema to create DataFrame on File Sources Using Spark StructType JSON Schema to create DataFrame on File Sources In some cases we may require to have a external StructType Schema in such cases we can define the StructType as JSON and store it as file and during ru...

How to Retrieve Password from JCEKS file in Spark

In the data ingestion stage into Hadoop from RBDMS sources, it often requires password to hit source tables in RDBMS databases. Passing hard password directly is highly unsafe and bad practice in real time applications. So, password can be encrypted by creating JCEKS file. JCEKS is basically a keystore file saved in the Java Cryptography Extension KeyStore (JCEKS) format; used as an alternative keystore to the Java Keystore (JKS) format for the Java platform; stores encoded keys. When working on Spark application which deals with RDBMS sources JCEKS need to be decrypted to query the source tables. Below is the handy function to retrieve password from JCEKS file- Using PySpark Using Scala

Joins in Spark SQL- Shuffle Hash, Sort Merge, BroadCast

Apache Spark SQL component comes with catalyst optimizer which smartly optimizes the jobs by re-arranging the order of transformations and by implementing some special joins according to datasets. Spark performs these joins internally or you can force it to perform them. It’s worthwhile to know this topic, so that it comes to rescue when optimizing the jobs according to your use case. Shuffle Hash Join Shuffle hash join shuffles the data based on join keys and then perform the join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. It follows the classic map-reduce pattern: First ...

How to Add Serial Number to Spark Dataframe

You may required to add Serial number to Spark Dataframe sometimes. It can be done with the spark function called monotonically_increasing_id(). It generates a new column with unique 64-bit monotonic index for each row. But it isn’t significant, as the sequence changes based on the partition. In short,  random numbers will be assigned which are out of sequence. If the goal is add serial number to the dataframe, you can use zipWithIndex method available on RDD. below is how you can achieve the same on dataframe. [code lang=”python”] from pyspark.sql.types import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rowId"): ”’ Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe and preserves a ...

Deep dive into Partitioning in Spark – Hash Partitioning and Range Partitioning

Partitions- The data within an RDD is split into several partitions. Properties of partitions: – Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine. – Each machine in the cluster contains one or more partitions. – The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes. Two kinds of partitioning available in Spark: – Hash partitioning – Range partitioning Customizing a partitioning is only possible on Pair RDDs. Hash partitioning- Given a Pair RDD that should be grouped: val purchasesPerCust = -> (p.customerId, p.price)) // Pair RDD .groupByKey() groupByKey first computes per tuple (k, v) its partition p: p = k....

Spark runtime Architecture – How Spark Jobs are executed

How Spark Jobs are Executed- A Spark application is a set of processes running on a cluster. All these processes are coordinated by the driver program. The driver is: -the process where the main() method of your program run. -the process running the code that creates a SparkContext, creates RDDs, and stages up or sends off transformations and actions. These processes that run computations and store data for your application are executors. Executors: -Run the tasks that represent the application. -Return computed results to the driver. -Provide in-memory storage for cached RDDs. Execution of a Spark program: 1. The driver program runs the Spark application, which creates a SparkContext upon start-up. 2. The SparkContext connects to a cluster manager (e.g., Mesos/YARN) which allocates resour...

Word count program in Spark

WordCount in Spark WordCount program is like basic hello world program when it comes to Big data world. Below is program to achieve wordCount in Spark with very few lines of code. [code lang=”scala”]val inputlines = sc.textfile("/users/guest/read.txt") val words = inputlines.flatMap(line=>line.split(" ")) val wMap = => (word,1)) val wOutput = wMap.reduceByKey(_ + _) wOutput.saveAsTextFile("/users/guest/")[/code]

Lost Password


24 Tutorials