collect_list by preserving order based on another variable
If you collect both dates and values as a list, you can sort the resulting column according to date using and udf
, and then keep only the values in the result.
import operator
import pyspark.sql.functions as F
# create list column
grouped_df = input_df.groupby("id") \
.agg(F.collect_list(F.struct("date", "value")) \
.alias("list_col"))
# define udf
def sorter(l):
res = sorted(l, key=operator.itemgetter(0))
return [item[1] for item in res]
sort_udf = F.udf(sorter)
# test
grouped_df.select("id", sort_udf("list_col") \
.alias("sorted_list")) \
.show(truncate = False)
+---+----------------+
|id |sorted_list |
+---+----------------+
|1 |[10, 5, 15, 20] |
|2 |[100, 500, 1500]|
+---+----------------+
Groupby and collect_list maintaining order based on another column in PySpark
You can try:
spark_df.orderBy('dateCol3', ascending=True).groupBy('dateCol1', 'dateCol2').agg(F.collect_list('Name'))
Alternatively, although it would be a bit of overkill you can use windowing:
from pyspark.sql import Window as w
spark_df.select('dateCol1', 'dateCol2', F.collect_list('Name').over(w.partitionBy(['dateCol1','dateCol2']).orderBy(F.col('dateCol3'))).alias('Name')).distinct()
How to maintain sort order in PySpark collect_list and collect multiple lists
Yes, the correct way is to add successive .withColumn statements, followed by a .agg statement that removes the duplicates for each array.
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data.withColumn('spp_imp_daily',
F.collect_list('spp_imp_daily').over(w)
)\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'),
F.max('MarchMadInd').alias('MarchMadInd')
)
preserve order based on another variable when calling collect_list using sparklyr
Solved! I misunderstood how collect_list() and Spark SQL could work together. I didn't realize a list could be returned, I thought that the concatenation had to take place within the query. The following produces the desired result:
spark_output <- spark_session(sc) %>%
sparklyr::invoke("sql",
"SELECT userid, collect_list(city)
OVER (PARTITION BY userid
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
AS cities
FROM my_sdf") %>%
sdf_register() %>%
group_by(userid) %>%
filter(row_number(userid) == 1) %>%
ungroup() %>%
mutate(cities = paste(cities, sep = " > ")) %>%
sdf_register()
Pyspark - Preserve order of collect list and collect set over multiple columns
Use monotically_increasing_id
function from spark to maintain the order.you can find more info about it here
#InputDF
# +----+----+----+-------+
# |col1|col2|col3| col4|
# +----+----+----+-------+
# | 1| A| U1| 12345|
# | 1| A| A1|549BZ4G|
# +----+----+----+-------+
df1 = df.withColumn("id", F.monotonically_increasing_id()).groupby("Col1", "col2").agg(F.collect_list("col4").alias("Col4"),F.collect_list("col3").alias("Col3"))
df1.select("col1", "col2",F.array_join("col3", ",").alias("col3"),F.array_join("col4", ",").alias("col4")).show()
# OutputDF
# +----+----+-----+-------------+
# |col1|col2| col3| col4|
# +----+----+-----+-------------+
# | 1| A|U1,A1|12345,549BZ4G|
# +----+----+-----+-------------+
Use array_distinct
on top of collect_list
to have distinct values and maintain order.
#InputDF
# +----+----+----+-------+
# |col1|col2|col3| col4|
# +----+----+----+-------+
# | 1| A| U1| 12345|
# | 1| A| A1|549BZ4G|
# | 1| A| U1|123456 |
# +----+----+----+-------+
df1 = df.withColumn("id", F.monotonically_increasing_id()).groupby("Col1", "col2").agg(
F.array_distinct(F.collect_list("col4")).alias("Col4"),F.array_distinct(F.collect_list("col3")).alias("Col3"))
df1.select("col1", "col2", F.array_join("col3", ",").alias("col3"), F.array_join("col4", ",").alias("col4")).show(truncate=False)
# +----+----+-----+---------------------+
# |col1|col2|col3 |col4 |
# +----+----+-----+---------------------+
# |1 |A |U1,A1|12345,549BZ4G,123456 |
# +----+----+-----+---------------------+
Sorting a collect_set by count
No, there is no method to order collect_set
by count, as collect aggregate methods don't count items, information is not available to sort items.
So, since Spark 3.1 and greater, and given a dataframe
with two columns id
and item
, you can:
- perform
count
over a groupBy on columnsid
anditems
- collect
(count, item)
couples to an array withcollect_list
andstruct
. Note: you can usecollect_set
here instead ofcollect_list
, but it is useless as we are sure that each element of(count, item)
is unique - use
sort_array
to sort your array by descending count - map your array with
transform
to dropcount
.
Which can be translated to code as follow:
from pyspark.sql import functions as F
final_df = dataframe.groupBy('id', 'item').count() \
.groupBy('id') \
.agg(
F.transform(
F.sort_array(
F.collect_list(F.struct("count", "item")),
asc=False
),
lambda x: x.getItem('item')
).alias('popular_items')
)
Note: if your spark version lower than 3.1 but greater than 1.6, you can replace transform
with withColumn
as follow:
from pyspark.sql import functions as F
final_df = dataframe.groupBy('id', 'item').count() \
.groupBy('id') \
.agg(F.sort_array(F.collect_list(F.struct("count", "item")), asc=False).alias('popular_items')) \
.withColumn("popular_items", F.col('popular_items.item'))
Example
With the following input dataframe:
+---+-----+
|id |item |
+---+-----+
|1 |item1|
|1 |item2|
|1 |item2|
|1 |item2|
|1 |item3|
|2 |item3|
|2 |item3|
|2 |item1|
|3 |item1|
|3 |item1|
+---+-----+
You get the following output:
+---+---------------------+
|id |popular_items |
+---+---------------------+
|1 |[item2, item3, item1]|
|3 |[item1] |
|2 |[item3, item1] |
+---+---------------------+
Groupby column and create lists for another column values in pyspark
You can preserve ordering by applying collect_list
over the window function. In this case the window is partitioned by user
and ordered by score descending
.
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Window as W
dummy = pd.DataFrame([[1047,2021,0.38],[1056,2021,0.19]],columns=['reco','user','score'])
df = spark.createDataFrame(dummy)
window_spec = W.partitionBy("user").orderBy(F.desc("score"))
ranged_spec = window_spec.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
df.withColumn("reco", F.collect_list("reco").over(window_spec))\
.withColumn("score", F.collect_list("score").over(window_spec))\
.withColumn("rn", F.row_number().over(window_spec))\
.where("rn == 1")\
.drop("rn").show()
Output
+------------+----+------------+
| reco|user| score|
+------------+----+------------+
|[1047, 1056]|2021|[0.38, 0.19]|
+------------+----+------------+
Related Topics
How to Call Python Script on Excel Vba
Pandas To_Csv() Slow Saving Large Dataframe
Comparing Two Xml Files in Python
Python Comparing Previous and Next Row Value
Error Opening File in H5Py (File Signature Not Found)
How to Deal With Certificates Using Selenium
Using Regex to Find All Phrases That Are Completely Capitalized
Open a Putty Window and Run Ssh Commands - Python
Converting Text File into Json in a Specific Format ( Python )
How to Share Single Sqlite Connection in Multi-Threaded Python Application
How to Enable Autocomplete (Intellisense) for Python Package Modules
How to Count Duplicate Rows in Pandas Dataframe
How to Extract Rar Files Inside Google Colab
In Python, How to Find the Vowels in a Word