Memory Management in Spark 1.6 Executors run as Java processes, so the available memory is equal to the heap size. Internally available memory is split into several regions with specific functions. Execution Memory storage for data needed during tasks execution shuffle-related data Storage Memory storage of cached RDDs and broadcast variables possible to borrow from execution memory (spill otherwise) safeguard value is 50% of Spark Memory when cached blocks are immune to eviction User Memory user data structures and internal metadata in Spark safeguarding against OOM Reserved memory memory needed for running executor itself and not strictly related to Spark
Dataset API The Dataset API, released as an API preview in Spark 1.6, aims to provide the best of both worlds; the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API. When it comes to serializing data, the Dataset API has the concept of encoders which translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object. Spark does not yet provide an API for implementing custom ...
DataFrame API Spark 1.3 introduced a new DataFrame API as part of the Project Tungsten initiative which seeks to improve the performance and scalability of Spark. The DataFrame API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization. There are also advantages when performing computations in a single process as Spark can serialize the data into off-heap storage in a binary format and then perform many transformations directly on this off-heap memory, avoiding the garbage-collection costs associated with constructing individual objects for each row in the data set. Because Spark understands the schema, there is no need to use Java serialization to encode the d...
DataFrame Dataset Spark Release Spark 1.3 Spark 1.6 Data Representation A DataFrame is a distributed collection of data organized into named columns. It is conceptually equal to a table in a relational database. It is an extension of DataFrame API that provides the functionality of – type-safe, object-oriented programming interface of the RDD API and performance benefits of the Catalyst query optimizer and off heap storage mechanism of a DataFrame API. Data Formats It can process structured and unstructured data efficiently. It organizes the data into named column. DataFrames allow the Spark to manage schema. It also efficiently processes structured and unstructured data. It represents data in the form of JVM objects of row or a collection of row object. which is represented in tabular for...
scala> val inputDF = sc.parallelize(Seq((1,"oclay",400,"2015-01-01 00:00:00"),(1,"oclay",800,"2018-01-01 00:00:00"))).toDF("pid","pname","price","last_mod") scala> inputDF.show +---+-----+-----+-------------------+ |pid|pname|price| last_mod| +---+-----+-----+-------------------+ | 1|oclay| 400|2015-01-01 00:00:00| | 1|oclay| 800|2018-01-01 00:00:00| +---+-----+-----+-------------------+ scala> import org.apache.spark.sql.DataFrame import org.apache.spark.sql.expressions._ import org.apache.spark.sql.functions._ def getLatestRecs(df: DataFrame, partition_col: List[String], sortCols: List[String]): DataFrame = { val part = Window.partitionBy(partition_col.head,partition_col:_*).orderBy(array(sortCols.head,sortCols:_*).desc) val rowDF = df.withColumn("rn", row_number().over(part)) v...
Caching and Persistence- By default, RDDs are recomputed each time you run an action on them. This can be expensive (in time) if you need to use a dataset more than once. Spark allows you to control what is cached in memory. [code lang=”scala”]val logs: RDD[String] = sc.textFile("/log.txt") val logsWithErrors = logs.filter(_.contains("ERROR”)).persist() val firstnrecords = logsWithErrors.take(10)[/code] Here, we cache logswithErrors in memory. After firstnrecords is computed, Spark will store the contents of firstnrecords for faster access in future operations if we would like to reuse it. [code lang=”scala”]val numErrors = logsWithErrors.count() //faster result[/code] Now, computing the count on logsWithErrors is much faster. There are many ways to c...
Transformations and Actions – Spark defines transformations and actions on RDDs. Transformations – Return new RDDs as results. They are lazy, Their result RDD is not immediately computed. Actions – Compute a result based on an RDD and either returned or saved to an external storage system (e.g., HDFS). They are eager, their result is immediately computed. Laziness/eagerness is how we can limit network communication using the programming model. Example – Consider the following simple example: val input: List[String] = List(“hi”,”this”,”is”,”example”) val words = sc.parallelize(input) val lengths = words.map(_.length) Nothing happened on the cluster at this point, execution of map(a transformation) is deferred. We need t...
RDDs can be created in two ways: 1)Transforming an existing RDD. 2)From a SparkContext or SparkSession object. – Transforming an existing RDD: When map called on List, it returns a new List. Similarly, many higher-order functions defined on RDD returns a new RDD. – From a SparkContext (or SparkSession) object: The SparkContext object (renamed SparkSession) can be thought of as your handle to the Spark cluster. It represents the connection between the Spark cluster and your running application. It defines a handful of methods which can be used to create and populate a new RDD: a)parallelize: convert a local Scala collection to an RDD. ex:- val rdd= sc.parallelize(Seq("1","2","3")) b)textFile: read a text file from HDFS or a local file system and return an RDD of String. ex:-val ...
Spark implements a distributed data parallel model called Resilient Distributed Datasets(RDDs). Given some large dataset that can’t fit into memory on a single node. ->Chunk up the data(Diagrams needs to be added) ->Distribute it over the cluster of machines. ->From there, think of your distributed data like a single collection. RDDs are Spark’s Distributed collections. It seems a lot like immutable sequential or parallel Scala collections. [code]abstract class RDD[T]{ def map[U](f: T => U): RDD[U] = … def flatMap[U](f: T => TraversableOnce[U]): RDD[U] = .. def filter(f; T => Boolean): RDD[T] = … def reduce(f: (T, T) => T): T = … }[/code] Most operations on RDDs, like Scala’s immutable List. and Scala’s parallel collections, ar...
Hadoop/MapReduce- Hadoop is a widely-used large-scale batch data processing framework. It’s an open source implementation of Google’s MapReduce. MapReduce was ground-breaking because it provided: -> simple API (simple map and reduce steps) -> fault tolerance Fault tolerance is what made it possible for Hadoop/MapReduce to scale to 100s or 1000s of nodes at all. Hadoop/MapReduce + Fault Tolerance Why is this important? For 100s or 1000s of old commodity machines. likelihood of at least one node failing is very high midway through a job. Thus, Hadoop/MapReduce’s ability to recover from node failure enabled: -> computations on unthinkably large data sets to succeed to completion. Fault tolerance + simple API At Google, MapReduce made it possible for an average Googl...
The primary concept behind big data analysis is parallelism, defined in computing as the simultaneous execution of processes. The reason for this parallelism is mainly to make analysis faster, but it is also because some data sets may be too dynamic, too large or simply too unwieldy to be placed efficiently in a single relational database. Parallelism is very important concept when it comes to data processing. Scala achieves Data parallelism in single compute node which is considered as Shared Memory and Spark achieves the data parallelism in the distributed fashion which spread across multiple nodes due to which the processing is very faster. Shared Memory Data Parallelism(Scala) – ->Split the data ->Workers/threads independently operate on the data in parallel. ->Combine when done....