Pyspark: Filter Dataframe Based on Multiple Conditions

Pyspark: Filter dataframe based on multiple conditions

Your logic condition is wrong. IIUC, what you want is:

import pyspark.sql.functions as f

df.filter((f.col('d')<5))\
.filter(
((f.col('col1') != f.col('col3')) |
(f.col('col2') != f.col('col4')) & (f.col('col1') == f.col('col3')))
)\
.show()

I broke the filter() step into 2 calls for readability, but you could equivalently do it in one line.

Output:

+----+----+----+----+---+
|col1|col2|col3|col4| d|
+----+----+----+----+---+
| A| xx| D| vv| 4|
| A| x| A| xx| 3|
| E| xxx| B| vv| 3|
| F|xxxx| F| vvv| 4|
| G| xxx| G| xx| 4|
+----+----+----+----+---+

Multiple condition filter on dataframe

TL;DR To pass multiple conditions to filter or where use Column objects and logical operators (&, |, ~). See Pyspark: multiple conditions in when clause.

df.filter((col("act_date") >= "2016-10-01") & (col("act_date") <= "2017-04-01"))

You can also use a single SQL string:

df.filter("act_date >='2016-10-01' AND act_date <='2017-04-01'")

In practice it makes more sense to use between:

df.filter(col("act_date").between("2016-10-01", "2017-04-01"))
df.filter("act_date BETWEEN '2016-10-01' AND '2017-04-01'")

The first approach is not even remote valid. In Python, and returns:

  • The last element if all expressions are "truthy".
  • The first "falsey" element otherwise.

As a result

"act_date <='2017-04-01'" and "act_date >='2016-10-01'"

is evaluated to (any non-empty string is truthy):

"act_date >='2016-10-01'"

Pyspark compound filter, multiple conditions

Well, since @DataDog has clarified it, so the code below replicates the filters put by OP.

Note: Each and every clause/sub-clause should be inside the parenthesis. If I have missed out, then it's an inadvertent mistake, as I did not have the data to test it. But the idea remains the same.

matches = df.filter(
((df.business >= 0.9) & (df.city ==1) & (df.street >= 0.7))
|
((df.phone == 1) & (df.firstname == 1) & (df.street ==1) & (df.city ==1))
|
((df.business >= 0.9) & (df.street >= 0.9) & (df.city ==1))
|
((df.phone == 1) & (df.street == 1) & (df.city ==1))
|
((df.lastname >= 0.9) & (df.phone == 1) & (df.business >=0.9) & (df.city ==1))
|
((df.phone == 1) & (df.street == 1) & (df.city ==1) & (df.busname >=0.6))
)

Filter spark dataframe with multiple conditions on multiple columns in Pyspark

you use filter or where function for DataFrame API version.

the equivalent code would be as follows :

df.filter(~((df.ID == 1) & (df.Event == 1)) & 
~((df.ID == 2) & (df.Event == 2)) &
~((df.ID == 1) & (df.Event == 0)) &
~((df.ID == 2) & (df.Event == 0)))

filter pyspark on multiple conditions using AND OR

You may use where on your df

df.where("""
(
col1='FALSE' AND
col2='Approved'
) OR
col1 <> 'FALSE'
""")

or

df.where(
(
(df.col1 == 'FALSE') &
(df.col2 == 'Approved')
)
|
(df.col1 != 'FALSE')
)

NB. we use & for and and | for or

How filter pyspark dataframe when multiple dynamic criteria needs to be applied

You can use left_semi join:

from pyspark.sql import functions as F

df = spark.createDataFrame([
("A01", "2021-01-01", 324), ("A01", "2021-01-02", 567), ("A01", "2021-01-03", 645),
("A01", "2021-01-04", 834), ("A02", "2021-01-01", 284), ("A02", "2021-01-02", 453),
("A02", "2021-01-03", 132), ("A04", "2021-01-04", 254)], ["ID", "Date", "SalesUSD"])

DfCriteria = spark.createDataFrame([
("A01", "2021-01-01", "2021-01-02"), ("A02", "2021-01-02", "2021-01-04")
], ["ID", "StartDate", "EndDate"])

result = df.join(
DfCriteria,
(df["ID"] == DfCriteria["ID"]) & F.col("Date").between(F.col("StartDate"), F.col("EndDate")),
'left_semi'
)

result.show()

# +---+----------+--------+
# | ID| Date|SalesUSD|
# +---+----------+--------+
# |A01|2021-01-01| 324|
# |A01|2021-01-02| 567|
# |A02|2021-01-02| 453|
# |A02|2021-01-03| 132|
# +---+----------+--------+

How to filter multiple rows based on rows and columns condition in pyspark

You need to use an or (|) instead of an and(&) operator when you combine the clauses:

import pyspark.sql.functions as F
import pyspark.sql.types as T
df = spark.createDataFrame([
("2020-11-02 08:51:50", "velocity", 1),
("2020-11-02 09:14:29", "Temp", 0),
("2020-11-02 09:18:32", "velocity", 0),
("2020-11-02 09:32:42", "velocity", 4),
("2020-11-03 13:06:03", "Temp", 2),
("2020-11-03 13:10:01", "Temp", 1),
("2020-11-03 13:54:38", "Temp", 5),
("2020-11-03 14:46:25", "velocity", 5),
("2020-11-03 14:57:31", "Kilometer",6),
("2020-11-03 15:07:07", "Kilometer", 7)],
["start_timestamp", "channel_name", "value"]).withColumn("start_timestamp", F.to_timestamp("start_timestamp"))

df_filtered = df.filter((((df.channel_name == "velocity") & (df.value >= 1 ) &
(df.value <= 5))) | # or instead of and
((df.channel_name == "Temp") & (df.value >= 0 ) &
(df.value <= 2)))
df_filtered.show()

Output:

+-------------------+------------+-----+
| start_timestamp|channel_name|value|
+-------------------+------------+-----+
|2020-11-02 08:51:50| velocity| 1|
|2020-11-02 09:14:29| Temp| 0|
|2020-11-02 09:32:42| velocity| 4|
|2020-11-03 13:06:03| Temp| 2|
|2020-11-03 13:10:01| Temp| 1|
|2020-11-03 14:46:25| velocity| 5|
+-------------------+------------+-----+

The filter you currently apply will return nothing, since you first check if channel name is equal to one specific string and then afterwards check if it is equal to another specific string. With or, only one of the clauses should be true to include the row in in the result dataframe.

PySpark Dataframes: how to filter on multiple conditions with compact code?

You can use the or_ operator instead :

from operator import or_
from functools import reduce

newdf = df.where(reduce(or_, (df[c] > 0 for c in df.columns)))

EDIT: More pythonista solution :

from pyspark.sql.functions import lit

def any_(*preds):
cond = lit(False)
for pred in preds:
cond = cond | pred
return cond

newdf = df.where(any_(*[df[c] > 0 for c in df.columns]))

EDIT 2: Full example :

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT
/_/

Using Python version 3.5.1 (default, Dec 7 2015 11:16:01)
SparkSession available as 'spark'.

In [1]: from pyspark.sql.functions import lit

In [2]: %pas
%paste %pastebin

In [2]: %paste
def any_(*preds):
cond = lit(False)
for pred in preds:
cond = cond | pred
return cond

## -- End pasted text --

In [3]: df = sc.parallelize([(1, 2, 3), (-1, -2, -3), (1, -1, 0)]).toDF()

In [4]: df.where(any_(*[df[c] > 0 for c in df.columns])).show()
# +---+---+---+
# | _1| _2| _3|
# +---+---+---+
# | 1| 2| 3|
# | 1| -1| 0|
# +---+---+---+

In [5]: df[any_(*[df[c] > 0 for c in df.columns])].show()
# +---+---+---+
# | _1| _2| _3|
# +---+---+---+
# | 1| 2| 3|
# | 1| -1| 0|
# +---+---+---+

In [6]: df.show()
# +---+---+---+
# | _1| _2| _3|
# +---+---+---+
# | 1| 2| 3|
# | -1| -2| -3|
# | 1| -1| 0|
# +---+---+---+


Related Topics



Leave a reply



Submit