How to update a pyspark dataframe with new values from another dataframe?
This is closely related to update a dataframe column with new values, except that you also want to add the rows from DataFrame B. One approach would be to first do what is outlined in the linked question and then union the result with DataFrame B and drop duplicates.
For example:
dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
.select(
'col_1',
f.when(
~f.isnull(f.col('b.col_2')),
f.col('b.col_2')
).otherwise(f.col('a.col_2')).alias('col_2'),
'b.col_3'
)\
.union(dfB)\
.dropDuplicates()\
.sort('col_1')\
.show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#| a| wew| 1|
#| b| eee| null|
#| c| rer| 3|
#| d| yyy| 2|
#+-----+-----+-----+
Or more generically using a list comprehension if you have a lot of columns to replace and you don't want to hard code them all:
cols_to_update = ['col_2']
dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
.select(
*[
['col_1'] +
[
f.when(
~f.isnull(f.col('b.{}'.format(c))),
f.col('b.{}'.format(c))
).otherwise(f.col('a.{}'.format(c))).alias(c)
for c in cols_to_update
] +
['b.col_3']
]
)\
.union(dfB)\
.dropDuplicates()\
.sort('col_1')\
.show()
How to update a dataframe in PySpark with random values from another dataframe?
If b
is small (3 rows), you can just collect it into a Python list and add it as an array column to a
. Then you can get a random element using shuffle
.
import pyspark.sql.functions as F
df = a.withColumn(
'Zip',
F.shuffle(
F.array(*[F.lit(r[0]) for r in b.collect()])
)[0]
)
df.show()
+----+-----+
|Name| Zip|
+----+-----+
| a|06901|
| b|06905|
| c|06902|
| d|06901|
+----+-----+
Set column status based on another dataframe column value pyspark
Use left join and when
expression to create new column cat2_status
if there is any match on support_df
:
from pyspark.sql import functions as F
result = main_df.alias("main").join(
support_df.alias("supp"),
(F.col("supp.cat") == "cat2") &
((F.col("main.cat2") == F.col("supp.value1")) |
(F.col("main.cat2") == F.col("supp.value2"))),
"left"
).select(
"main.*",
F.when(
F.col("supp.cat").isNotNull(), "Matched"
).otherwise("NotMatched").alias("cat2_status")
)
result.show()
#+----+----+----+-----------+
#|cat1|cat2|cat3|cat2_status|
#+----+----+----+-----------+
#| a| 9| e| NotMatched|
#| b| 3| f| Matched|
#| c| 11| g| NotMatched|
#| d| 6| h| Matched|
#+----+----+----+-----------+
Update a pyspark dataframe column, based on values match in another pyspark dataframe column
This is a simple left join. This should work:
df2.join(df1, "address", left).show()
Update values in a column based on values of another data frame's column values in PySpark
The first thing you want to do is explode
the values in df2.items2
so that contents of the arrays will be on separate rows:
from pyspark.sql.functions import explode
df2 = df2.select(explode("items2").alias("items2"))
df2.show()
#+------+
#|items2|
#+------+
#| B|
#| A|
#| C|
#| E|
#+------+
(This assumes that the values in df2.items2
are distinct- if not, you would need to add df2 = df2.distinct()
.)
Option 1: Use crossJoin
:
Now you can crossJoin
the new df2
back to df1
and keep only the rows where df1.items1
contains an element in df2.items2
. We can achieve this using pyspark.sql.functions.array_contains
and this trick that allows us to use a column value as a parameter.
After filtering, group by id1
and items1
and aggregate using pyspark.sql.functions.collect_list
from pyspark.sql.functions import expr, collect_list
df1.alias("l").crossJoin(df2.alias("r"))\
.where(expr("array_contains(l.items1, r.items2)"))\
.groupBy("l.id1", "l.items1")\
.agg(collect_list("r.items2").alias("items1_updated"))\
.show()
#+---+---------------+--------------+
#|id1| items1|items1_updated|
#+---+---------------+--------------+
#| 1| [E, A, C]| [A, C, E]|
#| 0| [B, C, D, E]| [B, C, E]|
#| 4|[A, C, E, B, D]| [B, A, C, E]|
#| 3| [E, G, A]| [A, E]|
#| 2| [F, A, E, B]| [B, A, E]|
#+---+---------------+--------------+
Option 2: Explode df1.items1
and left join:
Another option is to explode
the contents of items1
in df1
and do a left join. After the join, we have to do a similar group by and aggregation as above. This works because collect_list
will ignore the null
values introduced by the non-matching rows
df1.withColumn("items1", explode("items1")).alias("l")\
.join(df2.alias("r"), on=expr("l.items1=r.items2"), how="left")\
.groupBy("l.id1")\
.agg(
collect_list("l.items1").alias("items1"),
collect_list("r.items2").alias("items1_updated")
).show()
#+---+---------------+--------------+
#|id1| items1|items1_updated|
#+---+---------------+--------------+
#| 0| [E, B, D, C]| [E, B, C]|
#| 1| [E, C, A]| [E, C, A]|
#| 3| [E, A, G]| [E, A]|
#| 2| [F, E, B, A]| [E, B, A]|
#| 4|[E, B, D, C, A]| [E, B, C, A]|
#+---+---------------+--------------+
Related Topics
How to Get Max Output from a While Loop
Django: Check Whether an Object Already Exists Before Adding
How to Convert Number 1 to a Boolean in Python
How to Find a Word That Starts With a Specific Character
Possible to Loop Through Excel Files With Differently Named Sheets, and Import into a List
Printing a Multiplication Table With Nested Loops
Split/Explode a Column of Dictionaries into Separate Columns With Pandas
How to Send Email to Multiple Recipients Using Python Smtplib
How to Clear or Overwrite a Tkinter Canvas
How to Do a Conditional Count After Groupby on a Pandas Dataframe
How to Concatenate Multiple Column Values into a Single Column in Pandas Dataframe
How to Import a File in Python With Spaces in the Name