How to Update a Pyspark Dataframe With New Values from Another Dataframe

How to update a pyspark dataframe with new values from another dataframe?

This is closely related to update a dataframe column with new values, except that you also want to add the rows from DataFrame B. One approach would be to first do what is outlined in the linked question and then union the result with DataFrame B and drop duplicates.

For example:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
.select(
'col_1',
f.when(
~f.isnull(f.col('b.col_2')),
f.col('b.col_2')
).otherwise(f.col('a.col_2')).alias('col_2'),
'b.col_3'
)\
.union(dfB)\
.dropDuplicates()\
.sort('col_1')\
.show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#| a| wew| 1|
#| b| eee| null|
#| c| rer| 3|
#| d| yyy| 2|
#+-----+-----+-----+

Or more generically using a list comprehension if you have a lot of columns to replace and you don't want to hard code them all:

cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
.select(
*[
['col_1'] +
[
f.when(
~f.isnull(f.col('b.{}'.format(c))),
f.col('b.{}'.format(c))
).otherwise(f.col('a.{}'.format(c))).alias(c)
for c in cols_to_update
] +
['b.col_3']
]
)\
.union(dfB)\
.dropDuplicates()\
.sort('col_1')\
.show()

How to update a dataframe in PySpark with random values from another dataframe?

If b is small (3 rows), you can just collect it into a Python list and add it as an array column to a. Then you can get a random element using shuffle.

import pyspark.sql.functions as F

df = a.withColumn(
'Zip',
F.shuffle(
F.array(*[F.lit(r[0]) for r in b.collect()])
)[0]
)

df.show()
+----+-----+
|Name| Zip|
+----+-----+
| a|06901|
| b|06905|
| c|06902|
| d|06901|
+----+-----+

Set column status based on another dataframe column value pyspark

Use left join and when expression to create new column cat2_status if there is any match on support_df:

from pyspark.sql import functions as F


result = main_df.alias("main").join(
support_df.alias("supp"),
(F.col("supp.cat") == "cat2") &
((F.col("main.cat2") == F.col("supp.value1")) |
(F.col("main.cat2") == F.col("supp.value2"))),
"left"
).select(
"main.*",
F.when(
F.col("supp.cat").isNotNull(), "Matched"
).otherwise("NotMatched").alias("cat2_status")
)

result.show()

#+----+----+----+-----------+
#|cat1|cat2|cat3|cat2_status|
#+----+----+----+-----------+
#| a| 9| e| NotMatched|
#| b| 3| f| Matched|
#| c| 11| g| NotMatched|
#| d| 6| h| Matched|
#+----+----+----+-----------+

Update a pyspark dataframe column, based on values match in another pyspark dataframe column

This is a simple left join. This should work:

df2.join(df1, "address", left).show()

Update values in a column based on values of another data frame's column values in PySpark

The first thing you want to do is explode the values in df2.items2 so that contents of the arrays will be on separate rows:

from pyspark.sql.functions import explode
df2 = df2.select(explode("items2").alias("items2"))
df2.show()
#+------+
#|items2|
#+------+
#| B|
#| A|
#| C|
#| E|
#+------+

(This assumes that the values in df2.items2 are distinct- if not, you would need to add df2 = df2.distinct().)

Option 1: Use crossJoin:

Now you can crossJoin the new df2 back to df1 and keep only the rows where df1.items1 contains an element in df2.items2. We can achieve this using pyspark.sql.functions.array_contains and this trick that allows us to use a column value as a parameter.

After filtering, group by id1 and items1 and aggregate using pyspark.sql.functions.collect_list

from pyspark.sql.functions import expr, collect_list

df1.alias("l").crossJoin(df2.alias("r"))\
.where(expr("array_contains(l.items1, r.items2)"))\
.groupBy("l.id1", "l.items1")\
.agg(collect_list("r.items2").alias("items1_updated"))\
.show()
#+---+---------------+--------------+
#|id1| items1|items1_updated|
#+---+---------------+--------------+
#| 1| [E, A, C]| [A, C, E]|
#| 0| [B, C, D, E]| [B, C, E]|
#| 4|[A, C, E, B, D]| [B, A, C, E]|
#| 3| [E, G, A]| [A, E]|
#| 2| [F, A, E, B]| [B, A, E]|
#+---+---------------+--------------+

Option 2: Explode df1.items1 and left join:

Another option is to explode the contents of items1 in df1 and do a left join. After the join, we have to do a similar group by and aggregation as above. This works because collect_list will ignore the null values introduced by the non-matching rows

df1.withColumn("items1", explode("items1")).alias("l")\
.join(df2.alias("r"), on=expr("l.items1=r.items2"), how="left")\
.groupBy("l.id1")\
.agg(
collect_list("l.items1").alias("items1"),
collect_list("r.items2").alias("items1_updated")
).show()
#+---+---------------+--------------+
#|id1| items1|items1_updated|
#+---+---------------+--------------+
#| 0| [E, B, D, C]| [E, B, C]|
#| 1| [E, C, A]| [E, C, A]|
#| 3| [E, A, G]| [E, A]|
#| 2| [F, E, B, A]| [E, B, A]|
#| 4|[E, B, D, C, A]| [E, B, C, A]|
#+---+---------------+--------------+


Related Topics



Leave a reply



Submit