How can we Pivot and Unpivot data in Spark (or) What is Pivot in Spark?

In Big Data applications we rarely get the requirement to pivot the data because making transpose of billions of rows into billions columns doesn't make any sense. But sometimes we might get some dimensions tables or specification tables, which might demand us to pivot(transpose). In that case we need to transform rows into columns and columns into rows. But there is small difference between normal pivot operation and pivot method in Spark. Let's look into it.


pivot() in Spark

Pivot() is an aggregation operations in Spark. This pivot() method takes one of the columns from groupBy operation and rotates the data around it. And also it can take a List or Sequence of values from the pivot column to transpose data for those values only.

Let's see the code snippets to perform pivot operation.

Below code creates a DataFrame for employees data that contains three columns name, salary, department.

scala> val seq_data = Seq(("AAA",10000,"IT"),("BBB",20000,"Sales"),("CCC",30000,"IT"),("AAA",40000,"IT"),("EEE",50000,"Admin"),("FFF",60000,"Admin"),("AAA",70000,"IT"),("BBB",80000,"IT"))
seq_data: Seq[(String, Int, String)] = List((AAA,10000,IT), (BBB,20000,Sales), (CCC,30000,IT), (AAA,40000,IT), (EEE,50000,Admin), (FFF,60000,Admin), (AAA,70000,IT), (BBB,80000,IT))

scala> val data = seq_data.toDF("emp_name","salary","dept")
data: org.apache.spark.sql.DataFrame = [emp_name: string, salary: int ... 1 more field]

|emp_name|salary| dept|
|     AAA| 10000|   IT|
|     BBB| 20000|Sales|
|     CCC| 30000|   IT|
|     AAA| 40000|   IT|
|     EEE| 50000|Admin|
|     FFF| 60000|Admin|
|     AAA| 70000|   IT|
|     BBB| 80000|   IT|

Now if we wan to pivot the data around the name column and perform an aggregation operation to get average salary for each department, we can write a code snippet as shown below:

scala> val pivotedDf = data.groupBy("dept").pivot("emp_name").avg("salary")
| dept|    AAA|    BBB|    CCC|    EEE|    FFF|
|Sales|   null|20000.0|   null|   null|   null|
|Admin|   null|   null|   null|50000.0|60000.0|
|   IT|40000.0|80000.0|30000.0|   null|   null|

The above code has performed pivot operation for all the values in name column. And also as we are not supplying any column values to pivot operation, Spark framework treats pivot operation as an action and we will lose the advantage of Lazy Evaluation. To avoid this, as part of performance enhancement techniques, Spark 2.0 onwards pivot() method can take a List or Sequence of values from the pivot column and can transpose data around it. In this case Spark framework treats pivot operation as a transformation.

Below is the code that shows pivot operation for values from the pivot column:

scala> val pivotedDf = data.groupBy("dept").pivot("emp_name",Seq("AAA","BBB")).avg("salary")
| dept|    AAA|    BBB|
|Sales|   null|20000.0|
|Admin|   null|   null|
|   IT|40000.0|80000.0|


Unpivot in Spark

Upivot operation restores original data, which means it transposes back the columns to rows and vice versa. In spark we don't have any in-built method to perform this operation. So we need take help of other API methods and a stack(an in-built data structure).

Below is the code that performs unpivot operation on previously pivoted DataFrame.

scala>val unPivotedDf =$"dept",expr("stack(5, 'AAA',AAA,'BBB',BBB,'CCC',CCC,'EEE',EEE,'FFF',FFF) as (emp_name,salary)")).where($"salary".isNotNull)
| dept|emp_name| salary|
|Sales|     BBB|20000.0|
|Admin|     EEE|50000.0|
|Admin|     FFF|60000.0|
|   IT|     AAA|40000.0|
|   IT|     BBB|80000.0|
|   IT|     CCC|30000.0|


Leave a Reply

Your email address will not be published. Required fields are marked *