Apache Spark Transformations & Actions

Apache Spark RDD Operations

Apache Spark RDD supports two types of operations the first one is “Transformations” and the second one is “Actions”.Transformations create a new RDD from the existing RDD by applying Transformation functions and when we want to do some operation on that RDD then we call Actions and it returns the result.

In this tutorial, we will learn about RDD Transformations and Actions like what is Transformations in RDD and its various methods with examples and will learn what is actions and its methods with examples.

Spark RDD provides the following two types of operations.




Let us understand each operation in detail.


1. Transformations

RDD transformations are the methods that we apply to a dataset to create a new RDD. It will work on RDD and create a new RDD by applying transformation functions. The newly created RDDs are immutable in nature and can’t be changed. All transformations in Spark are lazy in nature that means when any transformation is applied to the RDD such as map (), filter (), or flatMap(), it does nothing and waits for actions and when actions like collect(), take(), foreach() invoke it does actual transformation/computation on the result of RDD.

There are two types of Transformations.

    A. Narrow Transformations

    B. Wide Transformations

Let us understand each operation in detail.


A. Narrow Transformations

In narrow transformations, each partition of the parent RDD is used by at most one partition of the child RDD and it is the result of methods like map(), filter() and union(), etc.




B. Wide Transformations

In wide transformations, each multiple child RDD partitions may depend on a single parent RDD partition and it is the result of methods like group by, reduce by, join, etc.



There are many transformation methods present for Spark RDD.

Let us see each with an example.

Start Spark shell using the below command (Please check your Spark directory).

/home/cloudduggu/spark$spark-shell


map()

It will return a new distributed dataset formed by passing each element of the source through a function.

scala>val x = sc.parallelize(Array("b", "a", "c","d","e")) scala>val y = x.map(z => (z,1)) scala>println(x.collect().mkString(", ")) scala>println(y.collect().mkString(", "))




filter()

It will return a new dataset formed by selecting those elements of the source on which function returns true.

scala>val x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10)) scala>val y = x.filter(n => n%2 == 1) scala>println(x.collect().mkString(", ")) scala>println(y.collect().mkString(", "))




flatMap()

It is similar to the map, but each input item can be mapped to 0 or more output items (so the function should return a Seq rather than a single item).

scala>val x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10)) scala>val y = x.flatMap(n => Array(n, n*100, 42)) scala>println(x.collect().mkString(", ")) scala>println(y.collect().mkString(", "))




groupByKey()

It will group the data in the original RDD. Create pairs where the key is the output of a user function, and the value is all items for which the function yields this key.

scala>val x = sc.parallelize( Array("Spark", "Hadoop", "Hive", "Scala")) scala>val y = x.groupBy(w => w.charAt(0)) scala>println(y.collect().mkString(", "))




mappartitions()

It is similar to the map but runs separately on each partition (block) of the RDD.

scala>val x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), 2) scala>def f(i:Iterator[Int])={ (i.sum,42).productIterator } scala>val y = x.mapPartitions(f) // glom() flattens elements on the same partition scala>val xOut = x.glom().collect() scala>val yOut = y.glom().collect()




mapPartitionsWithIndex()

It will return a new RDD by applying a function to each partition of this RDD while tracking the index of the original partition.

scala>val x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), 2) scala>def f(partitionIndex:Int, i:Iterator[Int]) = { (partitionIndex, i.sum).productIterator } scala>val y = x.mapPartitionsWithIndex(f) // glom() flattens elements on the same partition scala>val xOut = x.glom().collect() scala>val yOut = y.glom().collect()




sample()

It will return a new RDD containing a statistical sample of the original RDD.

scala>val x = sc.parallelize(Array(1, 2, 3, 4, 5,6,7,8,9)) scala>val y = x.sample(false, 0.4) // omitting seed will yield different output scala>println(y.collect().mkString(", "))




union()

It returns a new dataset that contains the union of the elements in the source dataset and the argument.

scala>val x = sc.parallelize(Array(1,2,3,4,5), 2) scala>val y = sc.parallelize(Array(3,4), 1) scala>val z = x.union(y) scala>val zOut = z.glom().collect()




join()

It returns a new RDD containing all pairs of elements having the same key in the original RDDs.

scala>val x = sc.parallelize(Array(("a", 1), ("b", 2))) scala>val y = sc.parallelize(Array(("a", 3), ("a", 4), ("b", 5))) scala>val z = x.join(y) scala>println(z.collect().mkString(", "))




distinct()

It returns a new RDD containing distinct items from the original RDD (omitting all duplicates).

scala>val x = sc.parallelize(Array(1,2,3,3,4,4,4,4,5,5,5,6,7,7,8,8,8,9,9,9)) scala>val y = x.distinct() scala>println(y.collect().mkString(", "))




coalesce()

It returns a new RDD which is reduced to a smaller number of partitions.

scala>val x = sc.parallelize(Array(1, 2, 3, 4, 5 , 6, 7, 8, 9), 3) scala>val y = x.coalesce(2) scala>val xOut = x.glom().collect() scala>val yOut = y.glom().collect()




keyby()

It creates a Pair RDD, forming one pair for each item in the original RDD. The pair’s key is calculated from the value via a user-supplied function.

scala>val x = sc.parallelize( Array("Spark", "Hive", "Hadoop", "Java")) scala>val y = x.keyBy(w => w.charAt(0)) scala>println(y.collect().mkString(", "))




2. Actions

Actions function works on the RDD which is created during the transformation phase. After performing actions the final result is returned to the driver program. For example, the Map function performs the transformation on the dataset and returns new RDD, and the reduce functions perform the aggregation on the newly created RDD and return the result to the driver.

There are many action methods present in Spark RDD.

Let us see each with an example.


Getnumpartitions()

It returns the number of partitions in RDD.

scala>val x = sc.parallelize(Array(1,2,3,4,5,6,7), 2) scala>val y = x.partitions.size scala>val xOut = x.glom().collect() scala>println(y)




collect()

It returns all items in the RDD to the driver in a single list.

scala>val x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9), 2) scala>val y = x.collect() scala>val xOut = x.glom().collect() scala>println(y)




reduce()

It will aggregate all the elements of the RDD by applying a user function pairwise to elements and partial results and returns a result to the driver.

scala>val x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9)) scala>val y = x.reduce((a,b) => a+b) scala>println(x.collect.mkString(", ")) scala>println(y)




aggregate()

It aggregates all the elements of the RDD by applying a user function to combine elements with user-supplied objects, - then combining those user-defined results via a second user function, - and finally returning a result to the driver.

scala>def seqOp = (data:(Array[Int], Int), item:Int) => (data._1 :+ item, data._2 + item) scala>def combOp = (d1:(Array[Int], Int), d2:(Array[Int], Int)) => (d1._1.union(d2._1), d1._2 + d2._2) scala>val x = sc.parallelize(Array(1,2,3,4,5,6,7,8)) scala>val y = x.aggregate((Array[Int](), 0))(seqOp, combOp) scala>println(y)




max()

It will return the maximum item in the RDD.

scala>val x = sc.parallelize(Array(2,4,1,5,10,1)) scala>val y = x.max scala>println(x.collect().mkString(", ")) scala>println(y)




sum()

It will return the sum of the items in the RDD.

scala>val x = sc.parallelize(Array(2,4,1,10,65)) scala>val y = x.sum scala>println(x.collect().mkString(", ")) scala>println(y)




mean()

It will return the mean of the items in the RDD.

scala>val x = sc.parallelize(Array(2,4,1,5,6,7,11,56,32)) scala>val y = x.mean scala>println(x.collect().mkString(", ")) scala>println(y)




stdev()

It returns the standard deviation of the items in the RDD.

scala>val x = sc.parallelize(Array(2,4,1,5,6,7,11,56,32)) scala>val y = x.mean scala>println(x.collect().mkString(", ")) scala>println(y)




countbykey()

It returns the standard deviation of the items in the RDD.

scala>val x = sc.parallelize(Array(('J',"Jony"),('F',"Friend"), ('A',"Apple"),('J',"Joker"))) scala>val y = x.countByKey() scala>println(y)

rdd countbykey