Pyspark Replace All Values in Dataframe With Another Values

PySpark replace value in several column at once

Since there are to the tune of 30/100 columns, so let's add a few more columns to the DataFrame to generalize it well.

# Loading the requisite packages
from pyspark.sql.functions import col, when
df = sc.parallelize([(1,"foo","val","baz","gun","can","baz","buz","oof"),
(2,"bar","baz","baz","baz","got","pet","stu","got"),
(3,"baz","buz","pun","iam","you","omg","sic","baz")]).toDF(["x","y","z","a","b","c","d","e","f"])
df.show()
+---+---+---+---+---+---+---+---+---+
| x| y| z| a| b| c| d| e| f|
+---+---+---+---+---+---+---+---+---+
| 1|foo|val|baz|gun|can|baz|buz|oof|
| 2|bar|baz|baz|baz|got|pet|stu|got|
| 3|baz|buz|pun|iam|you|omg|sic|baz|
+---+---+---+---+---+---+---+---+---+

Let's say we want to replace baz with Null in all the columns except in column x and a. Use list comprehensions to choose those columns where replacement has to be done.

# This contains the list of columns where we apply replace() function
all_column_names = df.columns
print(all_column_names)
['x', 'y', 'z', 'a', 'b', 'c', 'd', 'e', 'f']
columns_to_remove = ['x','a']
columns_for_replacement = [i for i in all_column_names if i not in columns_to_remove]
print(columns_for_replacement)
['y', 'z', 'b', 'c', 'd', 'e', 'f']

Finally, doing the replacement using when(), which actually is a pseudonym for if clause.

# Doing the replacement on all the requisite columns
for i in columns_for_replacement:
df = df.withColumn(i,when((col(i)=='baz'),None).otherwise(col(i)))
df.show()
+---+----+----+---+----+---+----+---+----+
| x| y| z| a| b| c| d| e| f|
+---+----+----+---+----+---+----+---+----+
| 1| foo| val|baz| gun|can|null|buz| oof|
| 2| bar|null|baz|null|got| pet|stu| got|
| 3|null| buz|pun| iam|you| omg|sic|null|
+---+----+----+---+----+---+----+---+----+

There is no need to create a UDF and define a function to do the replacement if it can be done with normal if-else clause. UDFs are in general a costly operation and should be avoided when ever possible.

Replace all values of a column in a dataframe with pyspark

It might be easier to use lit as follows:

from pyspark.sql.functions import lit
new_df = df.withColumn('column_name', lit(10))

Pyspark: Replace all occurrences of a value with null in dataframe

Another way to do this in a less verbose manner is to use replace.

pyspark_df.replace(-1,None).replace('-1',None).show()

How to replace a particular value in a Pyspark Dataframe column with another value?

You can merge multiple isin conditions into one

(df
.withColumn('aa', F
.when(F.col('A').isin(['OTH/CON', 'Freight Collect']), F.lit('Collect'))
.when(F.col('A').isin(['DBG']), F.lit('Dispose'))
.otherwise(F.col('A'))
)
.show()
)

+---------------+---+-------+
| A| B| aa|
+---------------+---+-------+
| OTH/CON| 2|Collect|
|Freight Collect| 3|Collect|
| OTH/CON| 4|Collect|
| DBG| 5|Dispose|
+---------------+---+-------+

Replace column value based other column values pyspark data frame

You would need to parse SQL function DATE_ADD like this:

(
df
.withColumn("Value", F.col("Value").cast("int"))
.withColumn("Date_2",
F.expr('DATE_ADD(Date_1, Value - 1)')
)
)

DATE_ADD(Date_1, Value - 1) will add to each row in Date_1 column value from column Value -1 (counting in days).

Additionally (if you don't have it done yet) Value columns should be INT. If you would have there for example DOUBLE type, AnalysisException occur.

pyspark dataframe search and replace multiple values

NB: In these examples I renamed columns find to colfind and replace to colreplace

Approach 1

Recommended when df1 is relatively small but this approach is more robust. We use a udf to replace values:

from pyspark.sql import functions as F
from pyspark.sql import Window

replacement_map = {}
for row in df1.collect():
replacement_map[row.colfind]=row.colreplace

@F.udf()
def find_and_replace(column_value):
for colfind in replacement_map:
column_value = column_value.replace(colfind,replacement_map[colfind])
return column_value

df2.withColumn("Replaced_Name",find_and_replace(F.col("Name"))).show()

Outputs:

+-----------------+--------------------+
| Name| Replaced_Name|
+-----------------+--------------------+
| ra'in| raddin|
| check)| checkbb|
| human be(ing| humanual beaaing|
|OP.86-1_0743 test|OPff86gg1ii0743 test|
| a,b.| aeebff|
| v(alue-1| vaaaluegg1|
+-----------------+--------------------+

Approach 2

If you split the Name column into rows and join on your dataframe with replacements, this task can be done as shown below:

NB. This approach is better suited to single character replacements

df_replaced = (
df2.alias("df2").select(
F.col("Name"),
F.posexplode(F.split("Name",''))
).join(
df1.alias("df1"),
on=(
(
F.col("col")==F.col("df1.colfind")
)
|
(
F.col("Name").contains(F.col("df1.colfind"))
&
(F.col("df1.colfind").substr(0,1)==F.col("col"))
)
),
how="left"
)
.select(
F.col("Name"),
F.concat_ws(
'',
F.collect_list(
F.coalesce(
F.col("df1.colreplace"),
F.col("col")
)
).over(
Window.partitionBy("Name").orderBy("pos")
)
).alias("Replaced_Name"),
F.row_number().over(
Window.partitionBy("Name").orderBy(F.col("pos").desc())
).alias("rn")
)
.where("rn=1")
.select("Name","Replaced_Name")
)

df_replaced.show()

Outputs:

+-----------------+--------------------+
|Name |Replaced_Name |
+-----------------+--------------------+
|OP.86-1_0743 test|OPff86gg1ii0743 test|
|a,b. |aeebff |
|check) |checkbb |
|human be(ing |humanualan beaaing |
|ra'in |raddin |
|v(alue-1 |vaaaluegg1 |
+-----------------+--------------------+

Debugging Output

The following output has been shared to align with the last question update this answer has responded to (i.e op may change data used in question).

df1 :

+-------+----------+
|colfind|colreplace|
+-------+----------+
| ,| ee|
| .| ff|
| —| ii|
| man| manual|
| )| bb|
| -| gg|
| ""| cc|
| '| dd|
| _| ii|
| (| aa|
| sunday| holiday|
+-------+----------+

df2 :

+-----------------+
| Name|
+-----------------+
| ra'in|
| check)|
| human be(ing|
|OP.86-1_0743 test|
| a,b.|
| v(alue-1|
+-----------------+

Output before summary

+-----------------+---+---+----------+-------+--------------------+---+
|Name |pos|col|colreplace|colfind|Replaced_Name |rn |
+-----------------+---+---+----------+-------+--------------------+---+
|OP.86-1_0743 test|0 |O |null |null |O |18 |
|OP.86-1_0743 test|1 |P |null |null |OP |17 |
|OP.86-1_0743 test|2 |. |ff |. |OPff |16 |
|OP.86-1_0743 test|3 |8 |null |null |OPff8 |15 |
|OP.86-1_0743 test|4 |6 |null |null |OPff86 |14 |
|OP.86-1_0743 test|5 |- |gg |- |OPff86gg |13 |
|OP.86-1_0743 test|6 |1 |null |null |OPff86gg1 |12 |
|OP.86-1_0743 test|7 |_ |ii |_ |OPff86gg1ii |11 |
|OP.86-1_0743 test|8 |0 |null |null |OPff86gg1ii0 |10 |
|OP.86-1_0743 test|9 |7 |null |null |OPff86gg1ii07 |9 |
|OP.86-1_0743 test|10 |4 |null |null |OPff86gg1ii074 |8 |
|OP.86-1_0743 test|11 |3 |null |null |OPff86gg1ii0743 |7 |
|OP.86-1_0743 test|12 | |null |null |OPff86gg1ii0743 |6 |
|OP.86-1_0743 test|13 |t |null |null |OPff86gg1ii0743 t |5 |
|OP.86-1_0743 test|14 |e |null |null |OPff86gg1ii0743 te |4 |
|OP.86-1_0743 test|15 |s |null |null |OPff86gg1ii0743 tes |3 |
|OP.86-1_0743 test|16 |t |null |null |OPff86gg1ii0743 test|2 |
|OP.86-1_0743 test|17 | |null |null |OPff86gg1ii0743 test|1 |
|a,b. |0 |a |null |null |a |5 |
|a,b. |1 |, |ee |, |aee |4 |
|a,b. |2 |b |null |null |aeeb |3 |
|a,b. |3 |. |ff |. |aeebff |2 |
|a,b. |4 | |null |null |aeebff |1 |
|check) |0 |c |null |null |c |7 |
|check) |1 |h |null |null |ch |6 |
|check) |2 |e |null |null |che |5 |
|check) |3 |c |null |null |chec |4 |
|check) |4 |k |null |null |check |3 |
|check) |5 |) |bb |) |checkbb |2 |
|check) |6 | |null |null |checkbb |1 |
|human be(ing |0 |h |null |null |h |13 |
|human be(ing |1 |u |null |null |hu |12 |
|human be(ing |2 |m |manual |man |humanual |11 |
|human be(ing |3 |a |null |null |humanuala |10 |
|human be(ing |4 |n |null |null |humanualan |9 |
|human be(ing |5 | |null |null |humanualan |8 |
|human be(ing |6 |b |null |null |humanualan b |7 |
|human be(ing |7 |e |null |null |humanualan be |6 |
|human be(ing |8 |( |aa |( |humanualan beaa |5 |
|human be(ing |9 |i |null |null |humanualan beaai |4 |
|human be(ing |10 |n |null |null |humanualan beaain |3 |
|human be(ing |11 |g |null |null |humanualan beaaing |2 |
|human be(ing |12 | |null |null |humanualan beaaing |1 |
|ra'in |0 |r |null |null |r |6 |
|ra'in |1 |a |null |null |ra |5 |
|ra'in |2 |' |dd |' |radd |4 |
|ra'in |3 |i |null |null |raddi |3 |
|ra'in |4 |n |null |null |raddin |2 |
|ra'in |5 | |null |null |raddin |1 |
|v(alue-1 |0 |v |null |null |v |9 |
|v(alue-1 |1 |( |aa |( |vaa |8 |
|v(alue-1 |2 |a |null |null |vaaa |7 |
|v(alue-1 |3 |l |null |null |vaaal |6 |
|v(alue-1 |4 |u |null |null |vaaalu |5 |
|v(alue-1 |5 |e |null |null |vaaalue |4 |
|v(alue-1 |6 |- |gg |- |vaaaluegg |3 |
|v(alue-1 |7 |1 |null |null |vaaaluegg1 |2 |
|v(alue-1 |8 | |null |null |vaaaluegg1 |1 |
+-----------------+---+---+----------+-------+--------------------+---+

Let me know if this works for you.

How to replace all values of the same group with the minimum in PySpark

You can use window functions for this purpose.

from pyspark.sql import functions as F, Window

df2 = df.select(
'title',
'var',
F.min('var').over(Window.partitionBy('title')).alias('min')
)

Or, for a simpler syntax:

df2 = df.selectExpr('title', 'var', 'min(var) over(partition by title) min')


Related Topics



Leave a reply



Submit