How to Pivot on Multiple Columns in Spark SQL

How to pivot on multiple columns in Spark SQL?

Here's a non-UDF way involving a single pivot (hence, just a single column scan to identify all the unique dates).

dff = mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))

Here's the result (apologies for the non-matching ordering and naming):

+---+-------+------+-------+------+-------+------+-------+------+               
| id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+

We just aggregate both on the price and the unit column after pivoting on the day.

If naming required as in question,

dff.select([F.col(c).name('_'.join(x for x in c.split('_')[::-1])) for c in dff.columns]).show()

+---+-------+------+-------+------+-------+------+-------+------+
| id|price_1|unit_1|price_2|unit_2|price_3|unit_3|price_4|unit_4|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+

How can I pivot on multiple columns separately in pyspark

With the link of @mrjoseph I came up with the following solution:
It works, it's more clean, but I still don't like the joins...

def pivot_udf(df, *cols):
mydf = df.select('id').drop_duplicates()
for c in cols:
mydf = mydf.join(
df
.withColumn('combcol',sf.concat(sf.lit('{}_'.format(c)),df[c]))
.groupby('id.pivot('combcol.agg(sf.count(c)),
how = 'left',
on = 'id'
)
return mydf

pivot_udf(sdf, 'col1','col2').show()

+---+---------+---------+---------+---------+---------+
| id|col1_str1|col1_str2|col1_str3|col2_str4|col2_str5|
+---+---------+---------+---------+---------+---------+
| 1| 2| 2| 1| 3| 2|
| 2| null| 2| 2| 3| 1|
+---+---------+---------+---------+---------+---------+

How to pivot on more than one column for a spark dataframe?

You can pass an array column to pivot:

val df2 = df.groupBy("id","address")
.pivot(array("name","age"),
Seq(array(lit("John"),lit("30")).as("c1"),
array(lit("Mike"),lit("40")).as("c2")))
.agg(sum('age).as("a"), avg('class).as("c"))

df2.show
+---+-------+----+----+----+----+
| id|address|c1_a|c1_c|c2_a|c2_c|
+---+-------+----+----+----+----+
|200|Street2|null|null|null|null|
|100|Street1|30.0| 1.0|null|null|
|400|Street4|null|null|null|null|
|300|Street3|null|null|null|null|
+---+-------+----+----+----+----+

Pivot on multiple columns dynamically in Spark Dataframe

Use the built-in concatination function instead, it allows for a variable number of input columns. See the documentation.

In this case, you can do:

import org.apache.spark.sql.functions._

domainDF.withColumn("combColumn", concat(Seq($"col1", $"col2"):_*))
.groupBy("someCol").pivot("combColumn").agg(count)

If you want to use a separator between the column values, use concat_ws. For example, to use a space: concat_ws(" ", Seq(...)).


If you need to use an UDF due to other concerns, it's possible to use a variable number of arguments by wrapping them in an array, see: Spark UDF with varargs

Spark : Pivot with multiple columns

Fist rename the columns and apply self-join:

val leftRight = df
.withColumnRenamed("category", "left")
.join(df.withColumnRenamed("category", "right"), Seq("id"))

to get co-occurrences for each id. Next apply crosstab:

leftRight.stat.crosstab("left", "right")

to aggregate data across all ids. The result is:

+----------+---+---+---+---+---+
|left_right| A| B| C| D| E|
+----------+---+---+---+---+---+
| E| 0| 0| 0| 0| 1|
| A| 1| 1| 1| 0| 0|
| B| 1| 2| 1| 0| 0|
| C| 1| 1| 2| 0| 0|
| D| 0| 0| 0| 1| 0|
+----------+---+---+---+---+---+

Create multiple columns by pivoting even when pivoted value doesn't exist

This operation is called pivoting.

  • a couple of aggregations, since you need both, count of ID and sum of Sales
  • alias for aggregations, for changing column names
  • providing values in pivot, for cases where you want numbers for Category C, but C doesn't exist. Providing values boosts performance too.

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 'A', 123, 23),
(2, 'A', 123, 45),
(1, 'A', 234, 67),
(1, 'B', 567, 78),
(2, 'B', 567, 34),
(3, 'D', 789, 12),
(1, 'A', 890, 12)],
['Store_ID', 'Category', 'ID', 'Sales'])

Script:

df = (df
.groupBy('Store_ID')
.pivot('Category', ['A', 'B', 'C', 'D'])
.agg(
F.countDistinct('ID').alias('ID'),
F.sum('Sales').alias('Sales'))
.fillna(0))
df.show()
# +--------+----+-------+----+-------+----+-------+----+-------+
# |Store_ID|A_ID|A_Sales|B_ID|B_Sales|C_ID|C_Sales|D_ID|D_Sales|
# +--------+----+-------+----+-------+----+-------+----+-------+
# | 1| 3| 102| 1| 78| 0| 0| 0| 0|
# | 3| 0| 0| 0| 0| 0| 0| 1| 12|
# | 2| 1| 45| 1| 34| 0| 0| 0| 0|
# +--------+----+-------+----+-------+----+-------+----+-------+

How to pivot a DataFrame in PySpark on multiple columns?

As mentionned in the comment, here is a solution to pivot your data :

You should concat your columns a_id and b_id under a new column c_id and group by date then pivot on c_id and use values how to see fit.

As for resampling, I'd point you to the solution provided by @zero323 here.



Related Topics



Leave a reply



Submit