Pivot String Column on Pyspark Dataframe

Pivot String column on Pyspark Dataframe

Assuming that (id |type | date) combinations are unique and your only goal is pivoting and not aggregation you can use first (or any other function not restricted to numeric values):

from pyspark.sql.functions import first

(df_data
.groupby(df_data.id, df_data.type)
.pivot("date")
.agg(first("ship"))
.show())

## +---+----+------+------+------+
## | id|type|201601|201602|201603|
## +---+----+------+------+------+
## | 2| C| DOCK| null| null|
## | 0| A| DOCK| PORT| PORT|
## | 1| B| PORT| DOCK| null|
## +---+----+------+------+------+

If these assumptions is not correct you'll have to pre-aggregate your data. For example for the most common ship value:

from pyspark.sql.functions import max, struct

(df_data
.groupby("id", "type", "date", "ship")
.count()
.groupby("id", "type")
.pivot("date")
.agg(max(struct("count", "ship")))
.show())

## +---+----+--------+--------+--------+
## | id|type| 201601| 201602| 201603|
## +---+----+--------+--------+--------+
## | 2| C|[1,DOCK]| null| null|
## | 0| A|[1,DOCK]|[1,PORT]|[1,PORT]|
## | 1| B|[1,PORT]|[1,DOCK]| null|
## +---+----+--------+--------+--------+

Spark Pivot String in PySpark

You need to add an aggregation after pivot(). If you are sure there is only one "VAL" for each ("ID", "VAR") pair, you can use first():

from pyspark.sql import functions as f

result = df.groupBy("ID").pivot("VAR").agg(f.first("VAL"))
result.show()

+----+------+-----+-----+
| ID| VAR_1|VAR_2|VAR_3|
+----+------+-----+-----+
|ID_1|Butter|Toast| Ham|
|ID_2| Jam|Toast| Egg|
+----+------+-----+-----+

Efficient way to pivot columns and group by in pyspark data frame

Maybe, something slightly more effective :

# Compute order of apparition os type
w = Window.partitionBy('id','type').orderBy('s_id')
df = df.withColumn('order',F.rank().over(w))

# Concatenate columns
df = df.withColumn('type',F.concat(F.col('type'),
F.col('order'))).drop('order')
df.show()

+---+--------+----+-------+
| id| type|s_id| state|
+---+--------+----+-------+
| 1| ios1| 11| null|
| 1| ios2| 12| null|
| 1| ios3| 13| null|
| 1| ios4| 14| null|
| 3|android1| 18| null|
| 2| ios1| 21| not ok|
| 2|android1| 18|aborted|
| 1|android1| 15| ok|
| 1|android2| 16| not ok|
| 1|android3| 17|aborted|
+---+--------+----+-------+

Then pivot the dataframe and keep only 3 first os_type columns :

# Chose number of cols you want
n_type = 3
l_col=['ios'+str(i+1) for i in range(n_type)]+['android'+str(i+1) for i in range(n_type)]

df = df.groupBy('id').pivot('type').agg({'s_id':'max'}).orderBy('id').select(*l_col)
df.show()

+---+--------+--------+--------+----+----+----+
| id|android1|android2|android3|ios1|ios2|ios3|
+---+--------+--------+--------+----+----+----+
| 1| 15| 16| 17| 11| 12| 13|
| 2| 18| null| null| 21|null|null|
| 3| 18| null| null|null|null|null|
+---+--------+--------+--------+----+----+----+

Then use your method to join and add the final column

EDIT : I added a list of columns to select only required columns

Hive/pyspark: pivot non numeric data for huge dataset

You can use aggregations after pivoting, you'd have an option to rename column like so

import pyspark.sql.functions as F

(df
.groupBy('emp_id')
.pivot('rank')
.agg(
F.first('dept_id').alias('dept_id'),
F.first('dept_name').alias('dept_name')
)
.show()
)

# Output
# +------+---------+-----------+---------+-----------+
# |emp_id|1_dept_id|1_dept_name|2_dept_id|2_dept_name|
# +------+---------+-----------+---------+-----------+
# | 1002| 101| sales| 102| marketing|
# | 1001| 101| sales| 102| marketing|
# +------+---------+-----------+---------+-----------+

Pivoting ArrayType columns in pyspark

//Initially I am creating the sample data to load the data in dataframe.
import org.apache.spark.sql.functions._
val df = Seq(("2017-08-01","2343434545",Array("0.0","0.0","0.0","0.0"),"Berlin"),("2017-08-01","2343434545",Array("0.0","0.0","0.0","0.0"),"Rome"),("2017-08-01","2343434545",Array("0.0","0.0","0.0","0.0"),"NewYork"),("2017-08-01","2343434545",Array("0.0","0.0","0.0","0.0"),"Beijing"),("2019-12-01","6455534545",Array("0.0","0.0","0.0","0.0"),"Berlin"),("2019-12-01","6455534545",Array("0.0","0.0","0.0","0.0"),"Rome"),("2019-12-01","6455534545",Array("0.0","0.0","0.0","0.0"),"NewYork"),("2019-12-01","6455534545",Array("0.0","0.0","0.0","0.0"),"Beijing"))
.toDF("date","numeric_id","feature_column","city")

df.groupBy("date","numeric_id").pivot("city")
.agg(collect_list("feature_column"))
.withColumnRenamed("Beijing","feature_column_Beijing")
.withColumnRenamed("Berlin","feature_column_Berlin")
.withColumnRenamed("NewYork","feature_column_NewYork")
.withColumnRenamed("Rome","feature_column_Rome").show()

You can see the output as below :

Sample Image

Pivot and Concatenate columns in pyspark dataframe

Assuming you want 1 delimited string for each "mid", you can collect all "marks" with collect_list() and use concat_ws() to create the string:

import pyspark.sql.functions as F

df.groupby('mid').agg(F.concat_ws('|', F.collect_list('marks')).alias('marks_str')).show(truncate=False)

How to pivot on multiple columns in Spark SQL?

Here's a non-UDF way involving a single pivot (hence, just a single column scan to identify all the unique dates).

dff = mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))

Here's the result (apologies for the non-matching ordering and naming):

+---+-------+------+-------+------+-------+------+-------+------+               
| id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+

We just aggregate both on the price and the unit column after pivoting on the day.

If naming required as in question,

dff.select([F.col(c).name('_'.join(x for x in c.split('_')[::-1])) for c in dff.columns]).show()

+---+-------+------+-------+------+-------+------+-------+------+
| id|price_1|unit_1|price_2|unit_2|price_3|unit_3|price_4|unit_4|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+

pivot one column into multiple columns in Pyspark/Python

A bit tricky and need some more wrangling:

import pyspark.sql.functions as F
from pyspark.sql import Window

df2 = df.withColumn(
'rn',
F.row_number().over(Window.partitionBy('ID', 'Class').orderBy('Date'))
).withColumn(
'mindate',
F.min('Date').over(Window.partitionBy('ID', 'Class'))
).withColumn(
'rn2',
F.dense_rank().over(Window.partitionBy('ID').orderBy('mindate'))
).groupBy('ID', 'rn').pivot('rn2').agg(
F.first(F.struct('Date', 'Class'))
).orderBy('ID', 'rn')

df3 = df2.select(
'ID',
*[f'{c}.*' for c in df2.columns[2:]]
)

df3.show(truncate=False)
+---+----------+-------------+----------+-------------+----------+---------------+
|ID |Date |Class |Date |Class |Date |Class |
+---+----------+-------------+----------+-------------+----------+---------------+
|1 |2021/01/01|math, english|2021/01/03|chinese |2021/01/04|math, chemistry|
|1 |2021/01/02|math, english|2021/01/06|chinese |null |null |
|1 |2021/01/05|math, english|null |null |null |null |
|2 |2021/01/01|PE |2021/01/02|math, chinese|2021/01/03|math, english |
|2 |null |null |2021/01/04|math, chinese|null |null |
+---+----------+-------------+----------+-------------+----------+---------------+


Related Topics



Leave a reply



Submit