Collect_List by Preserving Order Based on Another Variable

collect_list by preserving order based on another variable

If you collect both dates and values as a list, you can sort the resulting column according to date using and udf, and then keep only the values in the result.

import operator
import pyspark.sql.functions as F

# create list column
grouped_df = input_df.groupby("id") \
.agg(F.collect_list(F.struct("date", "value")) \
.alias("list_col"))

# define udf
def sorter(l):
res = sorted(l, key=operator.itemgetter(0))
return [item[1] for item in res]

sort_udf = F.udf(sorter)

# test
grouped_df.select("id", sort_udf("list_col") \
.alias("sorted_list")) \
.show(truncate = False)
+---+----------------+
|id |sorted_list |
+---+----------------+
|1 |[10, 5, 15, 20] |
|2 |[100, 500, 1500]|
+---+----------------+

Groupby and collect_list maintaining order based on another column in PySpark

You can try:

spark_df.orderBy('dateCol3', ascending=True).groupBy('dateCol1', 'dateCol2').agg(F.collect_list('Name'))

Alternatively, although it would be a bit of overkill you can use windowing:

from pyspark.sql import Window as w

spark_df.select('dateCol1', 'dateCol2', F.collect_list('Name').over(w.partitionBy(['dateCol1','dateCol2']).orderBy(F.col('dateCol3'))).alias('Name')).distinct()

How to maintain sort order in PySpark collect_list and collect multiple lists

Yes, the correct way is to add successive .withColumn statements, followed by a .agg statement that removes the duplicates for each array.

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data.withColumn('spp_imp_daily',
F.collect_list('spp_imp_daily').over(w)
)\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\

.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'),
F.max('MarchMadInd').alias('MarchMadInd')
)

preserve order based on another variable when calling collect_list using sparklyr

Solved! I misunderstood how collect_list() and Spark SQL could work together. I didn't realize a list could be returned, I thought that the concatenation had to take place within the query. The following produces the desired result:

spark_output <- spark_session(sc) %>%
sparklyr::invoke("sql",
"SELECT userid, collect_list(city)
OVER (PARTITION BY userid
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
AS cities
FROM my_sdf") %>%
sdf_register() %>%
group_by(userid) %>%
filter(row_number(userid) == 1) %>%
ungroup() %>%
mutate(cities = paste(cities, sep = " > ")) %>%
sdf_register()

Pyspark - Preserve order of collect list and collect set over multiple columns

Use monotically_increasing_id function from spark to maintain the order.you can find more info about it here

    #InputDF
# +----+----+----+-------+
# |col1|col2|col3| col4|
# +----+----+----+-------+
# | 1| A| U1| 12345|
# | 1| A| A1|549BZ4G|
# +----+----+----+-------+

df1 = df.withColumn("id", F.monotonically_increasing_id()).groupby("Col1", "col2").agg(F.collect_list("col4").alias("Col4"),F.collect_list("col3").alias("Col3"))

df1.select("col1", "col2",F.array_join("col3", ",").alias("col3"),F.array_join("col4", ",").alias("col4")).show()

# OutputDF
# +----+----+-----+-------------+
# |col1|col2| col3| col4|
# +----+----+-----+-------------+
# | 1| A|U1,A1|12345,549BZ4G|
# +----+----+-----+-------------+

Use array_distinct on top of collect_list to have distinct values and maintain order.

    #InputDF
# +----+----+----+-------+
# |col1|col2|col3| col4|
# +----+----+----+-------+
# | 1| A| U1| 12345|
# | 1| A| A1|549BZ4G|
# | 1| A| U1|123456 |
# +----+----+----+-------+



df1 = df.withColumn("id", F.monotonically_increasing_id()).groupby("Col1", "col2").agg(
F.array_distinct(F.collect_list("col4")).alias("Col4"),F.array_distinct(F.collect_list("col3")).alias("Col3"))

df1.select("col1", "col2", F.array_join("col3", ",").alias("col3"), F.array_join("col4", ",").alias("col4")).show(truncate=False)

# +----+----+-----+---------------------+
# |col1|col2|col3 |col4 |
# +----+----+-----+---------------------+
# |1 |A |U1,A1|12345,549BZ4G,123456 |
# +----+----+-----+---------------------+

Sorting a collect_set by count

No, there is no method to order collect_set by count, as collect aggregate methods don't count items, information is not available to sort items.

So, since Spark 3.1 and greater, and given a dataframe with two columns id and item, you can:

  1. perform count over a groupBy on columns id and items
  2. collect (count, item) couples to an array with collect_list and struct. Note: you can use collect_set here instead of collect_list, but it is useless as we are sure that each element of (count, item) is unique
  3. use sort_array to sort your array by descending count
  4. map your array with transform to drop count.

Which can be translated to code as follow:

from pyspark.sql import functions as F

final_df = dataframe.groupBy('id', 'item').count() \
.groupBy('id') \
.agg(
F.transform(
F.sort_array(
F.collect_list(F.struct("count", "item")),
asc=False
),
lambda x: x.getItem('item')
).alias('popular_items')
)

Note: if your spark version lower than 3.1 but greater than 1.6, you can replace transform with withColumn as follow:

from pyspark.sql import functions as F

final_df = dataframe.groupBy('id', 'item').count() \
.groupBy('id') \
.agg(F.sort_array(F.collect_list(F.struct("count", "item")), asc=False).alias('popular_items')) \
.withColumn("popular_items", F.col('popular_items.item'))

Example

With the following input dataframe:

+---+-----+
|id |item |
+---+-----+
|1 |item1|
|1 |item2|
|1 |item2|
|1 |item2|
|1 |item3|
|2 |item3|
|2 |item3|
|2 |item1|
|3 |item1|
|3 |item1|
+---+-----+

You get the following output:

+---+---------------------+
|id |popular_items |
+---+---------------------+
|1 |[item2, item3, item1]|
|3 |[item1] |
|2 |[item3, item1] |
+---+---------------------+

Groupby column and create lists for another column values in pyspark

You can preserve ordering by applying collect_list over the window function. In this case the window is partitioned by user and ordered by score descending.

import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Window as W

dummy = pd.DataFrame([[1047,2021,0.38],[1056,2021,0.19]],columns=['reco','user','score'])

df = spark.createDataFrame(dummy)

window_spec = W.partitionBy("user").orderBy(F.desc("score"))
ranged_spec = window_spec.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

df.withColumn("reco", F.collect_list("reco").over(window_spec))\
.withColumn("score", F.collect_list("score").over(window_spec))\
.withColumn("rn", F.row_number().over(window_spec))\
.where("rn == 1")\
.drop("rn").show()

Output

+------------+----+------------+
| reco|user| score|
+------------+----+------------+
|[1047, 1056]|2021|[0.38, 0.19]|
+------------+----+------------+


Related Topics



Leave a reply



Submit