Spark

Deep dive into Partitioning in Spark – Hash Partitioning and Range Partitioning

Partitions-
The data within an RDD is split into several partitions.

Properties of partitions:
– Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine.
– Each machine in the cluster contains one or more partitions.
– The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes.

Two kinds of partitioning available in Spark:
– Hash partitioning
– Range partitioning

Customizing a partitioning is only possible on Pair RDDs.
Hash partitioning-
Given a Pair RDD that should be grouped:
val purchasesPerCust =
purchasesRdd.map(p -> (p.customerId, p.price)) // Pair RDD
.groupByKey()
groupByKey first computes per tuple (k, v) its partition p:
p = k.hashCode() % numPartitions
Then, all tuples in the same partition p are sent to the machine hosting p.
Intuition: hash partitioning attempts to spread data evenly across
partitions based on the key.

Range partitioning-
Pair RDDs may contain keys that have an ordering defined.
Examples: Int, Char, String,..
For such RDDs, range partitioning may be more efficient.
Using a range partitioner, keys are partitioned according to:
1. an ordering for keys
2. a set of sorted ranges of keys
Property: tuples with keys in the same range appear on the same machine.
Hash Partitioning: Example

Consider a Pair RDD, with keys [8, 96, 240, 400, 401, 800], and the desired number of partitions of 4.
Furthermore, suppose that hashCode() is the identity (n.hash€ode() ==n).
In this case, hash partitioning distributes the keys as follows among the partitions:

– partition 0: [8, 96, 240, 400, 800]

– partition 1: [401]

– partition 2: []

– partition 3: []

The result is a very unbalanced distribution which hurts performance.

Range Partitioning: Example
Using range partitioning the distribution can be improved significantly:
Assumptions: (a) keys non-negative, (b) 800 is biggest key in the RDD.
Set of ranges: [1, 200], [201, 400], [401, 600], [601, 800]
In this case, range partitioning distributes the keys as follows among the partitions:

-partition 0: [8, 96]

-partition 1: [240, 400]

-partition 2: [401]

-partition 3: [800]

The resulting partitioning is much more balanced.

Partitioning Data: partitionBy

Invoking partitionBy creates an RDD with a specified partitioner.
Example:
val pairs = purchasesRdd.map(p => (p.customerId, p.price))

val tunedPartitioner = new RangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()

Creating a RangePartitioner requires:

1. Specifying the desired number of partitions.
2. Providing a Pair RDD with ordered keys. This RDD is sampled to create a suitable set of sorted ranges.

Important: the result of partitionBy should be persisted. Otherwise, the partitioning is repeatedly applied (involved shuffling!) each time the partitioned RDD is used.

Share This Post

Avatar
An Ambivert, music lover, enthusiast, artist, designer, coder, gamer, content writer. He is Professional Software Developer with hands-on experience in Spark, Kafka, Scala, Python, Hadoop, Hive, Sqoop, Pig, php, html,css. Know more about him at www.saikumar.me

Lost Password

Register