Pyspark: Filter dataframe based on multiple conditions
Your logic condition is wrong. IIUC, what you want is:
import pyspark.sql.functions as f
df.filter((f.col('d')<5))\
.filter(
((f.col('col1') != f.col('col3')) |
(f.col('col2') != f.col('col4')) & (f.col('col1') == f.col('col3')))
)\
.show()
I broke the filter()
step into 2 calls for readability, but you could equivalently do it in one line.
Output:
+----+----+----+----+---+
|col1|col2|col3|col4| d|
+----+----+----+----+---+
| A| xx| D| vv| 4|
| A| x| A| xx| 3|
| E| xxx| B| vv| 3|
| F|xxxx| F| vvv| 4|
| G| xxx| G| xx| 4|
+----+----+----+----+---+
Multiple condition filter on dataframe
TL;DR To pass multiple conditions to filter
or where
use Column
objects and logical operators (&
, |
, ~
). See Pyspark: multiple conditions in when clause.
df.filter((col("act_date") >= "2016-10-01") & (col("act_date") <= "2017-04-01"))
You can also use a single SQL string:
df.filter("act_date >='2016-10-01' AND act_date <='2017-04-01'")
In practice it makes more sense to use between:
df.filter(col("act_date").between("2016-10-01", "2017-04-01"))
df.filter("act_date BETWEEN '2016-10-01' AND '2017-04-01'")
The first approach is not even remote valid. In Python, and
returns:
- The last element if all expressions are "truthy".
- The first "falsey" element otherwise.
As a result
"act_date <='2017-04-01'" and "act_date >='2016-10-01'"
is evaluated to (any non-empty string is truthy):
"act_date >='2016-10-01'"
Pyspark compound filter, multiple conditions
Well, since @DataDog has clarified it, so the code below replicates the filters put by OP.
Note: Each and every clause/sub-clause should be inside the parenthesis. If I have missed out, then it's an inadvertent mistake, as I did not have the data to test it. But the idea remains the same.
matches = df.filter(
((df.business >= 0.9) & (df.city ==1) & (df.street >= 0.7))
|
((df.phone == 1) & (df.firstname == 1) & (df.street ==1) & (df.city ==1))
|
((df.business >= 0.9) & (df.street >= 0.9) & (df.city ==1))
|
((df.phone == 1) & (df.street == 1) & (df.city ==1))
|
((df.lastname >= 0.9) & (df.phone == 1) & (df.business >=0.9) & (df.city ==1))
|
((df.phone == 1) & (df.street == 1) & (df.city ==1) & (df.busname >=0.6))
)
Filter spark dataframe with multiple conditions on multiple columns in Pyspark
you use filter or where function for DataFrame API version.
the equivalent code would be as follows :
df.filter(~((df.ID == 1) & (df.Event == 1)) &
~((df.ID == 2) & (df.Event == 2)) &
~((df.ID == 1) & (df.Event == 0)) &
~((df.ID == 2) & (df.Event == 0)))
filter pyspark on multiple conditions using AND OR
You may use where
on your df
df.where("""
(
col1='FALSE' AND
col2='Approved'
) OR
col1 <> 'FALSE'
""")
or
df.where(
(
(df.col1 == 'FALSE') &
(df.col2 == 'Approved')
)
|
(df.col1 != 'FALSE')
)
NB. we use &
for and
and |
for or
How filter pyspark dataframe when multiple dynamic criteria needs to be applied
You can use left_semi
join:
from pyspark.sql import functions as F
df = spark.createDataFrame([
("A01", "2021-01-01", 324), ("A01", "2021-01-02", 567), ("A01", "2021-01-03", 645),
("A01", "2021-01-04", 834), ("A02", "2021-01-01", 284), ("A02", "2021-01-02", 453),
("A02", "2021-01-03", 132), ("A04", "2021-01-04", 254)], ["ID", "Date", "SalesUSD"])
DfCriteria = spark.createDataFrame([
("A01", "2021-01-01", "2021-01-02"), ("A02", "2021-01-02", "2021-01-04")
], ["ID", "StartDate", "EndDate"])
result = df.join(
DfCriteria,
(df["ID"] == DfCriteria["ID"]) & F.col("Date").between(F.col("StartDate"), F.col("EndDate")),
'left_semi'
)
result.show()
# +---+----------+--------+
# | ID| Date|SalesUSD|
# +---+----------+--------+
# |A01|2021-01-01| 324|
# |A01|2021-01-02| 567|
# |A02|2021-01-02| 453|
# |A02|2021-01-03| 132|
# +---+----------+--------+
How to filter multiple rows based on rows and columns condition in pyspark
You need to use an or (|
) instead of an and(&
) operator when you combine the clauses:
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = spark.createDataFrame([
("2020-11-02 08:51:50", "velocity", 1),
("2020-11-02 09:14:29", "Temp", 0),
("2020-11-02 09:18:32", "velocity", 0),
("2020-11-02 09:32:42", "velocity", 4),
("2020-11-03 13:06:03", "Temp", 2),
("2020-11-03 13:10:01", "Temp", 1),
("2020-11-03 13:54:38", "Temp", 5),
("2020-11-03 14:46:25", "velocity", 5),
("2020-11-03 14:57:31", "Kilometer",6),
("2020-11-03 15:07:07", "Kilometer", 7)],
["start_timestamp", "channel_name", "value"]).withColumn("start_timestamp", F.to_timestamp("start_timestamp"))
df_filtered = df.filter((((df.channel_name == "velocity") & (df.value >= 1 ) &
(df.value <= 5))) | # or instead of and
((df.channel_name == "Temp") & (df.value >= 0 ) &
(df.value <= 2)))
df_filtered.show()
Output:
+-------------------+------------+-----+
| start_timestamp|channel_name|value|
+-------------------+------------+-----+
|2020-11-02 08:51:50| velocity| 1|
|2020-11-02 09:14:29| Temp| 0|
|2020-11-02 09:32:42| velocity| 4|
|2020-11-03 13:06:03| Temp| 2|
|2020-11-03 13:10:01| Temp| 1|
|2020-11-03 14:46:25| velocity| 5|
+-------------------+------------+-----+
The filter you currently apply will return nothing, since you first check if channel name is equal to one specific string and then afterwards check if it is equal to another specific string. With or, only one of the clauses should be true to include the row in in the result dataframe.
PySpark Dataframes: how to filter on multiple conditions with compact code?
You can use the or_
operator instead :
from operator import or_
from functools import reduce
newdf = df.where(reduce(or_, (df[c] > 0 for c in df.columns)))
EDIT: More pythonista solution :
from pyspark.sql.functions import lit
def any_(*preds):
cond = lit(False)
for pred in preds:
cond = cond | pred
return cond
newdf = df.where(any_(*[df[c] > 0 for c in df.columns]))
EDIT 2: Full example :
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT
/_/
Using Python version 3.5.1 (default, Dec 7 2015 11:16:01)
SparkSession available as 'spark'.
In [1]: from pyspark.sql.functions import lit
In [2]: %pas
%paste %pastebin
In [2]: %paste
def any_(*preds):
cond = lit(False)
for pred in preds:
cond = cond | pred
return cond
## -- End pasted text --
In [3]: df = sc.parallelize([(1, 2, 3), (-1, -2, -3), (1, -1, 0)]).toDF()
In [4]: df.where(any_(*[df[c] > 0 for c in df.columns])).show()
# +---+---+---+
# | _1| _2| _3|
# +---+---+---+
# | 1| 2| 3|
# | 1| -1| 0|
# +---+---+---+
In [5]: df[any_(*[df[c] > 0 for c in df.columns])].show()
# +---+---+---+
# | _1| _2| _3|
# +---+---+---+
# | 1| 2| 3|
# | 1| -1| 0|
# +---+---+---+
In [6]: df.show()
# +---+---+---+
# | _1| _2| _3|
# +---+---+---+
# | 1| 2| 3|
# | -1| -2| -3|
# | 1| -1| 0|
# +---+---+---+
Related Topics
SQL - Combining Multiple Like Queries
How to Compute Tf/Idf with SQL (Bigquery)
Select Distinct on One Column, Return Multiple Other Columns (SQL Server)
Postgresql Window Function: Partition by Comparison
Remove Trailing Empty Space in a Field Content
What Is "Structured" in Structured Query Language
Pass a Table Variable to Sp_Executesql
Get Excel Sheet into Temp Table Using a Script
How to Clear Oracle Execution Plan Cache for Benchmarking
How to Concatenate Text in a Query in SQL Server
"Like" Operator in Inner Join in SQL
How to Check If Identity_Insert Is Set to on or Off in SQL Server
How to Find the Record in a Table That Contains the Maximum Value