Transpose column to row with Spark
It is relatively simple to do with basic Spark SQL functions.
Python
from pyspark.sql.functions import array, col, explode, struct, lit
df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])
def to_long(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
to_long(df, ["A"])
Scala:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}
val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")
def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")
val kvs = explode(array(
cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
))
val byExprs = by.map(col(_))
df
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}
toLong(df, Seq("A"))
Convert columns to rows in Spark SQL
It's the opposite of pivot - it's called unpivot.
In Spark, unpivoting is implemented using stack
function.
Using PySpark, this is what you could do if you didn't have many columns:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(101, 3, 520, 2001),
(102, 29, 530, 2020)],
['ID', 'Value1', 'Value2', 'Value40'])
df = df.select(
"ID",
F.expr("stack(3, Value1, 'Value1', Value2, 'Value2', Value40, 'Value40') as (ValueVv, ValueDesc)")
)
From your example I see that you may have lots of columns. In this case you may use something like this:
cols_to_unpivot = [f"`{c}`, \'{c}\'" for c in df.columns if c != 'ID']
stack_string = ", ".join(cols_to_unpivot)
df = df.select(
"ID",
F.expr(f"stack({len(cols_to_unpivot)}, {stack_string}) as (ValueVv, ValueDesc)")
)
For the example data both versions return
+---+-------+---------+
| ID|ValueVv|ValueDesc|
+---+-------+---------+
|101| 3| Value1|
|101| 520| Value2|
|101| 2001| Value40|
|102| 29| Value1|
|102| 530| Value2|
|102| 2020| Value40|
+---+-------+---------+
Transpose specific columns to rows using python pyspark
You can construct an array of structs from the columns, and then explode the arrays and expand the structs to get the desired output.
import pyspark.sql.functions as F
struct_list = [
F.struct(
F.lit('rev').alias('Metric'),
F.col('rev2016').alias('2016'),
F.col('rev2017').alias('2017')
),
F.struct(
F.lit('main').alias('Metric'),
F.col('main2016').alias('2016'),
F.col('main2017').alias('2017')
)
]
df2 = df.withColumn(
'arr',
F.explode(F.array(*struct_list))
).select('id', 'company', 'type', 'arr.*')
df2.show()
+---+-------+----+------+----+----+
| id|company|type|Metric|2016|2017|
+---+-------+----+------+----+----+
| 1| google| web| rev| 100| 200|
| 1| google| web| main| 55| 66|
+---+-------+----+------+----+----+
Or you can use stack
:
df2 = df.selectExpr(
'id', 'company', 'type',
"stack(2, 'rev', rev2016, rev2017, 'main', main2016, main2017) as (Metric, `2016`, `2017`)"
)
df2.show()
+---+-------+----+------+----+----+
| id|company|type|Metric|2016|2017|
+---+-------+----+------+----+----+
| 1| google| web| rev| 100| 200|
| 1| google| web| main| 55| 66|
+---+-------+----+------+----+----+
Transposing rows to columns in PySpark
You can apply pivot
operation to transpose rows to columns.
from pyspark.sql import functions as F
data = [("Key1", "Value1", ),
("Key2", "Value2", ),
("Key3", "Value3", ), ]
df = spark.createDataFrame(data, ("Key", "Value", ))
df.groupBy().pivot("Key").agg(F.first("Value")).show()
"""
+------+------+------+
| Key1| Key2| Key3|
+------+------+------+
|Value1|Value2|Value3|
+------+------+------+
"""
Transpose DataFrame single row to column in Spark with scala
Thought differently with out using arrays_zip
(which is available in => Spark 2.4)] and got the below...
It will work for Spark =>2.0 onwards in a simpler way (flatmap
, map
and explode
functions)...
Here map
function (used in with column) creates a new map column. The input columns must be grouped as key-value pairs.
Case : String data type in Data :
import org.apache.spark.sql.functions._
val df: DataFrame =Seq((("val1"),("val2"),("val3"),("val4"),("val5"))).toDF("col1","col2","col3","col4","col5")
var columnsAndValues = df.columns.flatMap { c => Array(lit(c), col(c)) }
df.printSchema()
df.withColumn("myMap", map(columnsAndValues:_*)).select(explode($"myMap"))
.toDF("Columns","Values").show(false)
Result :
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
|-- col4: string (nullable = true)
|-- col5: string (nullable = true)
+-------+------+
|Columns|Values|
+-------+------+
|col1 |val1 |
|col2 |val2 |
|col3 |val3 |
|col4 |val4 |
|col5 |val5 |
+-------+------+
Case : Mix of data types in Data :
If you have different types convert them to String... remaining steps wont change..
val df1 = df.select(df.columns.map(c => col(c).cast(StringType)): _*)
Full Example :
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.Column
val df = Seq(((2), (3), (true), (2.4), ("val"))).toDF("col1", "col2", "col3", "col4", "col5")
df.printSchema()
/**
* convert all columns to to string type since its needed further
*/
val df1 = df.select(df.columns.map(c => col(c).cast(StringType)): _*)
df1.printSchema()
var ColumnsAndValues: Array[Column] = df.columns.flatMap { c => {
Array(lit(c), col(c))
}
}
df1.withColumn("myMap", map(ColumnsAndValues: _*))
.select(explode($"myMap"))
.toDF("Columns", "Values")
.show(false)
Result :
root
|-- col1: integer (nullable = false)
|-- col2: integer (nullable = false)
|-- col3: boolean (nullable = false)
|-- col4: double (nullable = false)
|-- col5: string (nullable = true)
root
|-- col1: string (nullable = false)
|-- col2: string (nullable = false)
|-- col3: string (nullable = false)
|-- col4: string (nullable = false)
|-- col5: string (nullable = true)
+-------+------+
|Columns|Values|
+-------+------+
|col1 |2 |
|col2 |3 |
|col3 |true |
|col4 |2.4 |
|col5 |val |
+-------+------+
Related Topics
How to Avoid "Runtimeerror: Dictionary Changed Size During Iteration" Error
How to Replace Text in a String Column of a Pandas Dataframe
How to Check If a Process Is Still Running Using Python on Linux
Why Is the Subprocess.Popen Argument Length Limit Smaller Than What the Os Reports
Dictionaries and Default Values
Replace All Elements of Python Numpy Array That Are Greater Than Some Value
Add Leading Zeros to Strings in Pandas Dataframe
What's the How to Install Pip, Virtualenv, and Distribute for Python
Does Python Support Multithreading? Can It Speed Up Execution Time
How to Include Third Party Python Libraries in Google App Engine
Environment Variable Differences When Using Paramiko
How to Write Output in Same Place on the Console
Permissionerror: [Errno 13] in Python
Truth Value of a String in Python