Filtering a Pyspark Dataframe Using Isin by Exclusion

Filtering a pyspark dataframe using isin by exclusion

It looks like the ~ gives the functionality that I need, but I am yet to find any appropriate documentation on it.

df.filter(~col('bar').isin(['a','b'])).show()



+---+---+
| id|bar|
+---+---+
| 4| c|
| 5| d|
+---+---+

Pyspark dataframe operator "IS NOT IN"

In pyspark you can do it like this:

array = [1, 2, 3]
dataframe.filter(dataframe.column.isin(array) == False)

Or using the binary NOT operator:

dataframe.filter(~dataframe.column.isin(array))

Pyspark: How to filter on list of two column value pairs?

Here's a way without joining, where you can chain a bunch of conditions in the filter in order to compare each row with the values in flist. It can take care of nulls.

from functools import reduce
import pyspark.sql.functions as F

flist = [(1, 'A'), (None, 2), (1, None)]

df2 = df.filter(
reduce(
lambda x, y: x | y,
[
((F.col('col1') == col1) if col1 is not None else F.col('col1').isNull()) &
((F.col('col2') == col2) if col2 is not None else F.col('col2').isNull())
for (col1, col2) in flist
]
)
)

df2.show()
+----+----+
|col1|col2|
+----+----+
| 1| A|
|null| 2|
| 1|null|
+----+----+

Pyspark: Filter dataframe based on multiple conditions

Your logic condition is wrong. IIUC, what you want is:

import pyspark.sql.functions as f

df.filter((f.col('d')<5))\
.filter(
((f.col('col1') != f.col('col3')) |
(f.col('col2') != f.col('col4')) & (f.col('col1') == f.col('col3')))
)\
.show()

I broke the filter() step into 2 calls for readability, but you could equivalently do it in one line.

Output:

+----+----+----+----+---+
|col1|col2|col3|col4| d|
+----+----+----+----+---+
| A| xx| D| vv| 4|
| A| x| A| xx| 3|
| E| xxx| B| vv| 3|
| F|xxxx| F| vvv| 4|
| G| xxx| G| xx| 4|
+----+----+----+----+---+

Filter Spark DataFrame based on another DataFrame that specifies denylist criteria

You'll need to use a left_anti join in this case.

The left anti join is the opposite of a left semi join.

It filters out data from the right table in the left table according to a given key :

largeDataFrame
.join(smallDataFrame, Seq("some_identifier"),"left_anti")
.show
// +---------------+----------+
// |some_identifier|first_name|
// +---------------+----------+
// | 222| mary|
// | 111| bob|
// +---------------+----------+

How to exclude elements contained in another column - Pyspark DataFrame

The problem is how you're using isin. For better or worse, isin can't actually handle another pyspark Column object as an input, it needs an actual collection. So one thing you could do is convert your column to a list :

col_values = df.select("C").rdd.flatMap(lambda x: x).collect()
df.filter(~df.B.isin(col_values))

Performance wise though, this is obviously not ideal as your master node is now in charge of manipulating the entire contents of the single column you've just loaded into memory. You could use a left anti join to get the result you need without having to transform anything into a list and losing the efficiency of spark distributed computing :

df0 = df[["C"]].withColumnRenamed("C", "B")
df.join(df0, "B", "leftanti").show()

Thanks to Emma in the comments for her contribution.

Pyspark filtering items in column of lists

I suggest 1) making a single column DataFrame of your urls using explode and 2) use posexplode to make 3-column DataFrame of your query, href, and index-position of href, then 3) inner join the two

  1. Create DataFrame of urls
from pyspark.sql.functions import explode, posexplode

urls = [
(['url1', 'url2', 'url3', 'url4', 'url5', 'url6', 'url7', 'url8'],),
]
refs = (
spark.createDataFrame(urls, ['ref']).
select(
explode('ref')
)
)
refs.show(truncate=False)
# +----+
# |col |
# +----+
# |url1|
# |url2|
# |url3|
# |url4|
# |url5|
# |url6|
# |url7|
# |url8|
# +----+

  1. Create Example Data you provided
data = [
("q1", ["url7", "url11", "url12", "url13", "url14"]),
("q2", ["url1", "url3", "url5", "url6"]),
("q3", ["url1", "url2", "url8"]),
]
df = spark.createDataFrame(data, ["query", "href"])
df.show(truncate=False)
# +-----+----------------------------------+
# |query|href |
# +-----+----------------------------------+
# |q1 |[url7, url11, url12, url13, url14]|
# |q2 |[url1, url3, url5, url6] |
# |q3 |[url1, url2, url8] |
# +-----+----------------------------------+

  1. Solution
(
df.
select(
'query',
posexplode('href')
).
join(
refs,
'col',
'inner'
).
orderBy('col', 'query').
show(truncate=False)
)
# +----+-----+---+
# |col |query|pos|
# +----+-----+---+
# |url1|q2 |0 |
# |url1|q3 |0 |
# |url2|q3 |1 |
# |url3|q2 |1 |
# |url5|q2 |2 |
# |url6|q2 |3 |
# |url7|q1 |0 |
# |url8|q3 |2 |
# +----+-----+---+

How to use NOT IN clause in filter condition in spark

Since your code isn't reproducible, here is a small example using spark-sql on how to select * from t where id in (...) :

// create a DataFrame for a range 'id' from 1 to 9.
scala> val df = spark.range(1,10).toDF
df: org.apache.spark.sql.DataFrame = [id: bigint]

// values to exclude
scala> val f = Seq(5,6,7)
f: Seq[Int] = List(5, 6, 7)

// select * from df where id is not in the values to exclude
scala> df.filter(!col("id").isin(f : _*)).show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 8|
| 9|
+---+

// select * from df where id is in the values to exclude
scala> df.filter(col("id").isin(f : _*)).show

Here is the RDD version of the not isin :

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val f = Seq(5,6,7)
f: Seq[Int] = List(5, 6, 7)

scala> val rdd2 = rdd.filter(x => !f.contains(x))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at <console>:28

Nevertheless, I still believe this is an overkill since you are already using spark-sql.

It seems in your case that you are actually dealing with DataFrames, thus the solutions mentioned above don't work.

You can use the left anti join approach :

scala> val source = spark.read.format("csv").load("source.file")
source: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields]

scala> val destination = spark.read.format("csv").load("destination.file")
destination: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 9 more fields]

scala> source.show
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
|_c0| _c1| _c2| _c3| _c4|_c5|_c6| _c7| _c8| _c9| _c10|
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
| 1| Ravi kumar| Ravi | kumar| MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama |
| 2|Shekhar shudhanshu| Shekhar|shudhanshu| Manulife | 2| M|18-01-1994|76.34| 250000| Alaska |
| 3|Preethi Narasingam| Preethi|Narasingam| Retail | 3| F|19-01-1994|77.45|270000.01| Arizona |
| 4| Abhishek Nair|Abhishek| Nair| Banking | 4| M|20-01-1994|78.65| 345000| Arkansas |
| 5| Ram Sharma| Ram| Sharma|Infrastructure | 5| M|21-01-1994|79.12| 45000| California |
| 6| Chandani Kumari|Chandani| Kumari| BNFS | 6| F|22-01-1994|80.13| 43000.02| Colorado |
| 7| Balaji Kumar| Balaji| Kumar| MSO | 1| M|23-01-1994|81.33| 1234678|Connecticut |
| 8| Naveen Shekrappa| Naveen| Shekrappa| Manulife | 2| M|24-01-1994| 100| 789414| Delaware |
| 9| Milind Chavan| Milind| Chavan| Retail | 3| M|25-01-1994|83.66| 245555| Florida |
| 10| Raghu Rajeev| Raghu| Rajeev| Banking | 4| M|26-01-1994|87.65| 235468| Georgia|
+---+------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+


scala> destination.show
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
|_c0| _c1| _c2| _c3| _c4|_c5|_c6| _c7| _c8| _c9| _c10|
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+
| 1| Ravi kumar| Revi | kumar| MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama |
| 1| Ravi1 kumar| Revi | kumar| MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama |
| 1| Ravi2 kumar| Revi | kumar| MSO | 1| M|17-01-1994| 74.5| 24000.78| Alabama |
| 2| Shekhar shudhanshu| Shekhar|shudhanshu| Manulife | 2| M|18-01-1994|76.34| 250000| Alaska |
| 3|Preethi Narasingam1| Preethi|Narasingam| Retail | 3| F|19-01-1994|77.45|270000.01| Arizona |
| 4| Abhishek Nair1|Abhishek| Nair| Banking | 4| M|20-01-1994|78.65| 345000| Arkansas |
| 5| Ram Sharma| Ram| Sharma|Infrastructure | 5| M|21-01-1994|79.12| 45000| California |
| 6| Chandani Kumari|Chandani| Kumari| BNFS | 6| F|22-01-1994|80.13| 43000.02| Colorado |
| 7| Balaji Kumar| Balaji| Kumar| MSO | 1| M|23-01-1994|81.33| 1234678|Connecticut |
| 8| Naveen Shekrappa| Naveen| Shekrappa| Manulife | 2| M|24-01-1994| 100| 789414| Delaware |
| 9| Milind Chavan| Milind| Chavan| Retail | 3| M|25-01-1994|83.66| 245555| Florida |
| 10| Raghu Rajeev| Raghu| Rajeev| Banking | 4| M|26-01-1994|87.65| 235468| Georgia|
+---+-------------------+--------+----------+---------------+---+---+----------+-----+---------+------------+

You'll just need to do the following :

scala> val res1 = source.join(destination, Seq("_c0"), "leftanti")

scala> val res2 = destination.join(source, Seq("_c0"), "leftanti")

It's the same logic I mentioned in my answer here.

Check if values of column pyspark df exist in other column pyspark df

you can collect all the values in column1 and then make a broadcast variable from it, on which you can write a udf

from pyspark.sql import udf
from pyspark.sql.types import BooleanType

df_B_col_1_values = df_B.rdd.map(lambda x: x.column1).distinct().collect()
df_B_col_1_values = sc.broadcast(set(df_B_col_1_values))

my_udf = udf(lambda x: x in df_B_col_1_values.value, BooleanType())
df_A = df_A.withColumn('column1_present', my_udf(col('column1'))


Related Topics



Leave a reply



Submit