Pyspark: how to duplicate a row n time in dataframe?
The explode function returns a new row for each element in the given array or map.
One way to exploit this function is to use a udf
to create a list of size n
for each row. Then explode the resulting array.
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import ArrayType, IntegerType
df = spark.createDataFrame([(1,2,1), (2,9,1), (3,8,2), (4,1,1), (5,3,3)] ,["A", "B", "n"])
+---+---+---+
| A| B| n|
+---+---+---+
| 1| 2| 1|
| 2| 9| 1|
| 3| 8| 2|
| 4| 1| 1|
| 5| 3| 3|
+---+---+---+
# use udf function to transform the n value to n times
n_to_array = udf(lambda n : [n] * n, ArrayType(IntegerType()))
df2 = df.withColumn('n', n_to_array(df.n))
+---+---+---------+
| A| B| n|
+---+---+---------+
| 1| 2| [1]|
| 2| 9| [1]|
| 3| 8| [2, 2]|
| 4| 1| [1]|
| 5| 3|[3, 3, 3]|
+---+---+---------+
# now use explode
df2.withColumn('n', explode(df2.n)).show()
+---+---+---+
| A | B | n |
+---+---+---+
| 1| 2| 1|
| 2| 9| 1|
| 3| 8| 2|
| 3| 8| 2|
| 4| 1| 1|
| 5| 3| 3|
| 5| 3| 3|
| 5| 3| 3|
+---+---+---+
duplicating records between date gaps within a selected time interval in a PySpark dataframe
With Respect to the @jxc comment, I have prepared the answer for this use case.
Following is the code snippet.
Import the spark SQL functions
from pyspark.sql import functions as F, Window
Prepare the sample data
simpleData = ((1,"Available",5,"2020-07"),
(1,"Available",8,"2020-08"),
(1,"Limited",8,"2020-12"),
(2,"Limited",1,"2020-09"),
(2,"Limited",3,"2020-12")
)
columns= ["product_id", "status", "price", "month"]
Creating dataframe of sample data
df = spark.createDataFrame(data = simpleData, schema = columns)
Add date column in dataframe to get proper formatted date
df0 = df.withColumn("date",F.to_date('month','yyyy-MM'))
df0.show()
+----------+---------+-----+-------+----------+
|product_id| status|price| month| date|
+----------+---------+-----+-------+----------+
| 1|Available| 5|2020-07|2020-07-01|
| 1|Available| 8|2020-08|2020-08-01|
| 1| Limited| 8|2020-12|2020-12-01|
| 2| Limited| 1|2020-09|2020-09-01|
| 2| Limited| 3|2020-12|2020-12-01|
+----------+---------+-----+-------+----------+
- Creating WinSpec w1 and use Window aggregate function lead to find the next date over(w1), convert it to the previous months to set up date sequences:
w1 = Window.partitionBy('product_id').orderBy('date')
df1 = df0.withColumn('end_date',F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
df1.show()
+----------+---------+-----+-------+----------+----------+
|product_id| status|price| month| date| end_date|
+----------+---------+-----+-------+----------+----------+
| 1|Available| 5|2020-07|2020-07-01|2020-07-01|
| 1|Available| 8|2020-08|2020-08-01|2020-11-01|
| 1| Limited| 8|2020-12|2020-12-01|2020-12-01|
| 2| Limited| 1|2020-09|2020-09-01|2020-11-01|
| 2| Limited| 3|2020-12|2020-12-01|2020-12-01|
+----------+---------+-----+-------+----------+----------+
- Using months_between(end_date, date) to calculate # of months between two dates, and use transform function to iterate through sequence(0, #months), create a named_struct with date=add_months(date,i) and price=IF(i=0,price,price), use inline_outer to explode the array of structs.
df2 = df1.selectExpr("product_id", "status", inline_outer( transform( sequence(0,int(months_between(end_date, date)),1), i -> (add_months(date,i) as date, IF(i=0,price,price) as price) ) ) )
df2.show()
+----------+---------+----------+-----+
|product_id| status| date|price|
+----------+---------+----------+-----+
| 1|Available|2020-07-01| 5|
| 1|Available|2020-08-01| 8|
| 1|Available|2020-09-01| 8|
| 1|Available|2020-10-01| 8|
| 1|Available|2020-11-01| 8|
| 1| Limited|2020-12-01| 8|
| 2| Limited|2020-09-01| 1|
| 2| Limited|2020-10-01| 1|
| 2| Limited|2020-11-01| 1|
| 2| Limited|2020-12-01| 3|
+----------+---------+----------+-----+
- Partitioning the dataframe on
product_id
and adding a rank column indf3
to get row number for each row. Then, Storing the maximum ofrank
column value with new columnmax_rank
for eachproduct_id
and storingmax_rank
in todf4
w2 = Window.partitionBy('product_id').orderBy('date')
df3 = df2.withColumn('rank',F.row_number().over(w2))
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int]
df3.show()
+----------+---------+----------+-----+----+
|product_id| status| date|price|rank|
+----------+---------+----------+-----+----+
| 1|Available|2020-07-01| 5| 1|
| 1|Available|2020-08-01| 8| 2|
| 1|Available|2020-09-01| 8| 3|
| 1|Available|2020-10-01| 8| 4|
| 1|Available|2020-11-01| 8| 5|
| 1| Limited|2020-12-01| 8| 6|
| 2| Limited|2020-09-01| 1| 1|
| 2| Limited|2020-10-01| 1| 2|
| 2| Limited|2020-11-01| 1| 3|
| 2| Limited|2020-12-01| 3| 4|
+----------+---------+----------+-----+----+
df4 = df3.groupBy("product_id").agg(F.max('rank').alias('max_rank'))
Schema: DataFrame[product_id: bigint, max_rank: int]
df4.show()
+----------+--------+
|product_id|max_rank|
+----------+--------+
| 1| 6|
| 2| 4|
+----------+--------+
- Joining
df3
anddf4
dataframes onproduct_id
getmax_rank
df5 = df3.join(df4,df3.product_id == df4.product_id,"inner") \
.select(df3.product_id,df3.status,df3.date,df3.price,df3.rank,df4.max_rank)
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int, max_rank: int]
df5.show()
+----------+---------+----------+-----+----+--------+
|product_id| status| date|price|rank|max_rank|
+----------+---------+----------+-----+----+--------+
| 1|Available|2020-07-01| 5| 1| 6|
| 1|Available|2020-08-01| 8| 2| 6|
| 1|Available|2020-09-01| 8| 3| 6|
| 1|Available|2020-10-01| 8| 4| 6|
| 1|Available|2020-11-01| 8| 5| 6|
| 1| Limited|2020-12-01| 8| 6| 6|
| 2| Limited|2020-09-01| 1| 1| 4|
| 2| Limited|2020-10-01| 1| 2| 4|
| 2| Limited|2020-11-01| 1| 3| 4|
| 2| Limited|2020-12-01| 3| 4| 4|
+----------+---------+----------+-----+----+--------+
- Then finally filtering the
df5
dataframe usingbetween
function to get the latest 6 months data.
FinalResultDF = df5.filter(F.col('rank') \
.between(F.when((F.col('max_rank') > 5),(F.col('max_rank')-6)).otherwise(0),F.col('max_rank'))) \
.select(df5.product_id,df5.status,df5.date,df5.price)
FinalResultDF.show(truncate=False)
+----------+---------+----------+-----+
|product_id|status |date |price|
+----------+---------+----------+-----+
|1 |Available|2020-07-01|5 |
|1 |Available|2020-08-01|8 |
|1 |Available|2020-09-01|8 |
|1 |Available|2020-10-01|8 |
|1 |Available|2020-11-01|8 |
|1 |Limited |2020-12-01|8 |
|2 |Limited |2020-09-01|1 |
|2 |Limited |2020-10-01|1 |
|2 |Limited |2020-11-01|1 |
|2 |Limited |2020-12-01|3 |
+----------+---------+----------+-----+
Duplicate rows in a Pyspark Dataframe
Import required functions from pyspark.sql.functions
:
from pyspark.sql.functions import array, explode, lit
and replace existing column:
df.withColumn("x4", explode(array(lit(1), df["x4"])))
Drop the duplicated rows and merge the ids using groupby in pyspark
Using this input dataframe:
df = spark.createDataFrame([
("e5882", None, "M", "AD", "9/14/2021 13:50"),
("e5882", None, "M", "AD", "10/22/2021 13:10"),
("5cddf", None, "M", "ED", "9/9/2021 12:00"),
("5cddf", "2010", None, "ED", "9/9/2021 12:00"),
("c3882", None, "M", "BD", "11/27/2021 5:00"),
("c3882", "1975", None, "BD", "11/27/2021 5:00"),
("9297d", None, "M", "GF", "10/18/2021 7:00"),
("9297d", "1999", None, "GF", "10/18/2021 7:00"),
("9298e", "1990", None, "GF", "10/18/2021 7:00"),
], ["id", "yob", "gender", "country", "timestamp"])
- If the same
id
having differencetimestamp
, want to pickup the recent timestamp.
Use window ranking function to get most recent row per id
. As you want to merge those with the same timestamp you can use dense_rank
instead of row_number
. But first you need to convert timestamp
strings into TimestampType otherwise comparison won't be correct (as '9/9/2021 12:00' > '10/18/2021 7:00'
)
from pyspark.sql import Window
import pyspark.sql.functions as F
df_most_recent = df.withColumn(
"timestamp",
F.to_timestamp("timestamp", "M/d/yyyy H:mm")
).withColumn(
"rn",
F.dense_rank().over(Window.partitionBy("id").orderBy(F.desc("timestamp")))
).filter("rn = 1")
- If the same ids having same
timestamp
but the any of column having null(yob
andgender
), that time, want to merge the bothid
as single
record without null. below I have pasted the data frame and desired
output.
Now the above df_most_recent
contains one or more rows having the same most recent timestamp per id
, you can group by id
to merge the values of the other columns like this:
result = df_most_recent.groupBy("id").agg(
*[F.collect_set(c)[0].alias(c) for c in df.columns if c!='id']
# or *[F.first(c).alias(c) for c in df.columns if c!='id']
)
result.show()
#+-----+----+------+-------+-------------------+
#|id |yob |gender|country|timestamp |
#+-----+----+------+-------+-------------------+
#|5cddf|2010|M |ED |2021-09-09 12:00:00|
#|9297d|1999|M |GF |2021-10-18 07:00:00|
#|9298e|1990|null |GF |2021-10-18 07:00:00|
#|c3882|1975|M |BD |2021-11-27 05:00:00|
#|e5882|null|M |AD |2021-10-22 13:10:00|
#+-----+----+------+-------+-------------------+
pyspark: duplicate row with column value from another row
df = spark.createDataFrame(
[
('2022-03-11 14:00:00','1'),
('2022-03-11 15:00:00','2'),
('2022-03-11 16:00:00','3')
], ['windowStart','nodeId'])
from pyspark.sql import Window as W
from pyspark.sql import functions as F
w = W.orderBy('windowStart')
df_lag = df\
.withColumn('lag', F.lead(F.col("windowStart"), 1).over(w))\
.select(F.col('lag').alias('windowStart'), 'nodeId')\
.filter(F.col('windowStart').isNotNull())
df.union(df_lag)\
.orderBy('windowStart', 'nodeId')\
.show()
+-------------------+------+
| windowStart|nodeId|
+-------------------+------+
|2022-03-11 14:00:00| 1|
|2022-03-11 15:00:00| 1|
|2022-03-11 15:00:00| 2|
|2022-03-11 16:00:00| 2|
|2022-03-11 16:00:00| 3|
+-------------------+------+
Related Topics
Python: How to Print Separate Lines from a List
How to Remove Square Brackets from List in Python
How to Do a Conditional Count After Groupby on a Pandas Dataframe
How to Add List into a New Column in CSV - Python
Pandas Populate New Dataframe Column Based on Matching Columns in Another Dataframe
How to Remove All Characters Before a Specific Character in Python
Remove Partial String from Dataframe With Pandas
Python - Regex Match Multiple Patterns in Multiple Lines
Easiest Way to Convert Two Columns to Python Dictionary
Finding the Index of the First Occurrence of Any Item in a List
Pyspark: How to Duplicate a Row N Time in Dataframe
How to Get All Possible Combinations of a List'S Elements
How to Update a Label Inside While Loop in Tkinter