Transpose Column to Row with Spark

Transpose column to row with Spark

It is relatively simple to do with basic Spark SQL functions.

Python

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"

# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")

return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])

Scala:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")

val kvs = explode(array(
cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
))

val byExprs = by.map(col(_))

df
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))

Convert columns to rows in Spark SQL

It's the opposite of pivot - it's called unpivot.

In Spark, unpivoting is implemented using stack function.

Using PySpark, this is what you could do if you didn't have many columns:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(101, 3, 520, 2001),
(102, 29, 530, 2020)],
['ID', 'Value1', 'Value2', 'Value40'])

df = df.select(
"ID",
F.expr("stack(3, Value1, 'Value1', Value2, 'Value2', Value40, 'Value40') as (ValueVv, ValueDesc)")
)

From your example I see that you may have lots of columns. In this case you may use something like this:

cols_to_unpivot = [f"`{c}`, \'{c}\'" for c in df.columns if c != 'ID']
stack_string = ", ".join(cols_to_unpivot)
df = df.select(
"ID",
F.expr(f"stack({len(cols_to_unpivot)}, {stack_string}) as (ValueVv, ValueDesc)")
)

For the example data both versions return

+---+-------+---------+
| ID|ValueVv|ValueDesc|
+---+-------+---------+
|101| 3| Value1|
|101| 520| Value2|
|101| 2001| Value40|
|102| 29| Value1|
|102| 530| Value2|
|102| 2020| Value40|
+---+-------+---------+

Transpose specific columns to rows using python pyspark

You can construct an array of structs from the columns, and then explode the arrays and expand the structs to get the desired output.

import pyspark.sql.functions as F

struct_list = [
F.struct(
F.lit('rev').alias('Metric'),
F.col('rev2016').alias('2016'),
F.col('rev2017').alias('2017')
),
F.struct(
F.lit('main').alias('Metric'),
F.col('main2016').alias('2016'),
F.col('main2017').alias('2017')
)
]

df2 = df.withColumn(
'arr',
F.explode(F.array(*struct_list))
).select('id', 'company', 'type', 'arr.*')

df2.show()
+---+-------+----+------+----+----+
| id|company|type|Metric|2016|2017|
+---+-------+----+------+----+----+
| 1| google| web| rev| 100| 200|
| 1| google| web| main| 55| 66|
+---+-------+----+------+----+----+

Or you can use stack:

df2 = df.selectExpr(
'id', 'company', 'type',
"stack(2, 'rev', rev2016, rev2017, 'main', main2016, main2017) as (Metric, `2016`, `2017`)"
)

df2.show()
+---+-------+----+------+----+----+
| id|company|type|Metric|2016|2017|
+---+-------+----+------+----+----+
| 1| google| web| rev| 100| 200|
| 1| google| web| main| 55| 66|
+---+-------+----+------+----+----+

Transposing rows to columns in PySpark

You can apply pivot operation to transpose rows to columns.


from pyspark.sql import functions as F

data = [("Key1", "Value1", ),
("Key2", "Value2", ),
("Key3", "Value3", ), ]

df = spark.createDataFrame(data, ("Key", "Value", ))

df.groupBy().pivot("Key").agg(F.first("Value")).show()

"""
+------+------+------+
| Key1| Key2| Key3|
+------+------+------+
|Value1|Value2|Value3|
+------+------+------+
"""

Transpose DataFrame single row to column in Spark with scala

Thought differently with out using arrays_zip (which is available in => Spark 2.4)] and got the below...

It will work for Spark =>2.0 onwards in a simpler way (flatmap , map and explode functions)...

Here map function (used in with column) creates a new map column. The input columns must be grouped as key-value pairs.

Case : String data type in Data :

import org.apache.spark.sql.functions._

val df: DataFrame =Seq((("val1"),("val2"),("val3"),("val4"),("val5"))).toDF("col1","col2","col3","col4","col5")

var columnsAndValues = df.columns.flatMap { c => Array(lit(c), col(c)) }
df.printSchema()

df.withColumn("myMap", map(columnsAndValues:_*)).select(explode($"myMap"))
.toDF("Columns","Values").show(false)

Result :

root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
|-- col4: string (nullable = true)
|-- col5: string (nullable = true)

+-------+------+
|Columns|Values|
+-------+------+
|col1 |val1 |
|col2 |val2 |
|col3 |val3 |
|col4 |val4 |
|col5 |val5 |
+-------+------+

Case : Mix of data types in Data :

If you have different types convert them to String... remaining steps wont change..

val df1 = df.select(df.columns.map(c => col(c).cast(StringType)): _*)

Full Example :

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.Column

val df = Seq(((2), (3), (true), (2.4), ("val"))).toDF("col1", "col2", "col3", "col4", "col5")
df.printSchema()
/**
* convert all columns to to string type since its needed further
*/
val df1 = df.select(df.columns.map(c => col(c).cast(StringType)): _*)
df1.printSchema()
var ColumnsAndValues: Array[Column] = df.columns.flatMap { c => {
Array(lit(c), col(c))
}
}

df1.withColumn("myMap", map(ColumnsAndValues: _*))
.select(explode($"myMap"))
.toDF("Columns", "Values")
.show(false)

Result :

root
|-- col1: integer (nullable = false)
|-- col2: integer (nullable = false)
|-- col3: boolean (nullable = false)
|-- col4: double (nullable = false)
|-- col5: string (nullable = true)

root
|-- col1: string (nullable = false)
|-- col2: string (nullable = false)
|-- col3: string (nullable = false)
|-- col4: string (nullable = false)
|-- col5: string (nullable = true)

+-------+------+
|Columns|Values|
+-------+------+
|col1 |2 |
|col2 |3 |
|col3 |true |
|col4 |2.4 |
|col5 |val |
+-------+------+


Related Topics



Leave a reply



Submit