How to pivot on multiple columns in Spark SQL?
Here's a non-UDF way involving a single pivot (hence, just a single column scan to identify all the unique dates).
dff = mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))
Here's the result (apologies for the non-matching ordering and naming):
+---+-------+------+-------+------+-------+------+-------+------+
| id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+
We just aggregate both on the price
and the unit
column after pivoting on the day.
If naming required as in question,
dff.select([F.col(c).name('_'.join(x for x in c.split('_')[::-1])) for c in dff.columns]).show()
+---+-------+------+-------+------+-------+------+-------+------+
| id|price_1|unit_1|price_2|unit_2|price_3|unit_3|price_4|unit_4|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+
How can I pivot on multiple columns separately in pyspark
With the link of @mrjoseph I came up with the following solution:
It works, it's more clean, but I still don't like the joins...
def pivot_udf(df, *cols):
mydf = df.select('id').drop_duplicates()
for c in cols:
mydf = mydf.join(
df
.withColumn('combcol',sf.concat(sf.lit('{}_'.format(c)),df[c]))
.groupby('id.pivot('combcol.agg(sf.count(c)),
how = 'left',
on = 'id'
)
return mydf
pivot_udf(sdf, 'col1','col2').show()
+---+---------+---------+---------+---------+---------+
| id|col1_str1|col1_str2|col1_str3|col2_str4|col2_str5|
+---+---------+---------+---------+---------+---------+
| 1| 2| 2| 1| 3| 2|
| 2| null| 2| 2| 3| 1|
+---+---------+---------+---------+---------+---------+
How to pivot on more than one column for a spark dataframe?
You can pass an array column to pivot
:
val df2 = df.groupBy("id","address")
.pivot(array("name","age"),
Seq(array(lit("John"),lit("30")).as("c1"),
array(lit("Mike"),lit("40")).as("c2")))
.agg(sum('age).as("a"), avg('class).as("c"))
df2.show
+---+-------+----+----+----+----+
| id|address|c1_a|c1_c|c2_a|c2_c|
+---+-------+----+----+----+----+
|200|Street2|null|null|null|null|
|100|Street1|30.0| 1.0|null|null|
|400|Street4|null|null|null|null|
|300|Street3|null|null|null|null|
+---+-------+----+----+----+----+
Pivot on multiple columns dynamically in Spark Dataframe
Use the built-in concatination function instead, it allows for a variable number of input columns. See the documentation.
In this case, you can do:
import org.apache.spark.sql.functions._
domainDF.withColumn("combColumn", concat(Seq($"col1", $"col2"):_*))
.groupBy("someCol").pivot("combColumn").agg(count)
If you want to use a separator between the column values, use concat_ws
. For example, to use a space: concat_ws(" ", Seq(...))
.
If you need to use an UDF
due to other concerns, it's possible to use a variable number of arguments by wrapping them in an array, see: Spark UDF with varargs
Spark : Pivot with multiple columns
Fist rename the columns and apply self-join:
val leftRight = df
.withColumnRenamed("category", "left")
.join(df.withColumnRenamed("category", "right"), Seq("id"))
to get co-occurrences for each id. Next apply crosstab
:
leftRight.stat.crosstab("left", "right")
to aggregate data across all ids. The result is:
+----------+---+---+---+---+---+
|left_right| A| B| C| D| E|
+----------+---+---+---+---+---+
| E| 0| 0| 0| 0| 1|
| A| 1| 1| 1| 0| 0|
| B| 1| 2| 1| 0| 0|
| C| 1| 1| 2| 0| 0|
| D| 0| 0| 0| 1| 0|
+----------+---+---+---+---+---+
Create multiple columns by pivoting even when pivoted value doesn't exist
This operation is called pivoting.
- a couple of aggregations, since you need both, count of ID and sum of Sales
alias
for aggregations, for changing column names- providing values in pivot, for cases where you want numbers for Category C, but C doesn't exist. Providing values boosts performance too.
Input:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 'A', 123, 23),
(2, 'A', 123, 45),
(1, 'A', 234, 67),
(1, 'B', 567, 78),
(2, 'B', 567, 34),
(3, 'D', 789, 12),
(1, 'A', 890, 12)],
['Store_ID', 'Category', 'ID', 'Sales'])
Script:
df = (df
.groupBy('Store_ID')
.pivot('Category', ['A', 'B', 'C', 'D'])
.agg(
F.countDistinct('ID').alias('ID'),
F.sum('Sales').alias('Sales'))
.fillna(0))
df.show()
# +--------+----+-------+----+-------+----+-------+----+-------+
# |Store_ID|A_ID|A_Sales|B_ID|B_Sales|C_ID|C_Sales|D_ID|D_Sales|
# +--------+----+-------+----+-------+----+-------+----+-------+
# | 1| 3| 102| 1| 78| 0| 0| 0| 0|
# | 3| 0| 0| 0| 0| 0| 0| 1| 12|
# | 2| 1| 45| 1| 34| 0| 0| 0| 0|
# +--------+----+-------+----+-------+----+-------+----+-------+
How to pivot a DataFrame in PySpark on multiple columns?
As mentionned in the comment, here is a solution to pivot your data :
You should concat your columns a_id
and b_id
under a new column c_id
and group by date
then pivot on c_id
and use values how to see fit.
As for resampling, I'd point you to the solution provided by @zero323 here.
Related Topics
Setting Matplotlib Colorbar Range
How to Get Max() to Return Variable Names Instead of Values in Python
Split String in a Spark Dataframe Column by Regular Expressions Capturing Groups
How to Serialize Sqlalchemy Result to Json
How to Insert a Checkbox in a Django Form
How to Find the Maximum Consecutive Occurrences of a Number in Python
Make Alternate Letters Capital
Opencv Typeerror: Expected Cv::Umat for Argument 'Src' - What Is This
Stripping Whitespaces from a List Inside the List of Tuples
Get First Date and Last Date of Current Quarter in Python
Import a File from a Subdirectory
Conda: Remove All Installed Packages from Base/Root Environment
Print 5 Items in a Row on Separate Lines for a List
Check If List Is Ascending or Descending (Using For)
Implement K-Fold Cross Validation in Mlpclassification Python