Count Number of Non-Nan Entries in Each Column of Spark Dataframe with Pyspark

Count number of non-NaN entries in each column of Spark dataframe in PySpark

Let's start with a dummy data:

from pyspark.sql import Row

row = Row("v", "x", "y", "z")
df = sc.parallelize([
row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0),
row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN"))
]).toDF()

## +----+----+---+---+
## | v| x| y| z|
## +----+----+---+---+
## | 0.0| 1| 2|3.0|
## |null| 3| 4|5.0|
## |null|null| 6|7.0|
## | NaN| 8| 9|NaN|
## +----+----+---+---+

All you need is a simple aggregation:

from pyspark.sql.functions import col, count, isnan, lit, sum

def count_not_null(c, nan_as_null=False):
"""Use conversion between boolean and integer
- False -> 0
- True -> 1
"""
pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
return sum(pred.cast("integer")).alias(c)

df.agg(*[count_not_null(c) for c in df.columns]).show()

## +---+---+---+---+
## | v| x| y| z|
## +---+---+---+---+
## | 2| 3| 4| 4|
## +---+---+---+---+

or if you want to treat NaN a NULL:

df.agg(*[count_not_null(c, True) for c in df.columns]).show()

## +---+---+---+---+
## | v| x| y| z|
## +---+---+---+---+
## | 1| 3| 4| 3|
## +---+---+---+---

You can also leverage SQL NULL semantics to achieve the same result without creating a custom function:

df.agg(*[
count(c).alias(c) # vertical (column-wise) operations in SQL ignore NULLs
for c in df.columns
]).show()

## +---+---+---+
## | x| y| z|
## +---+---+---+
## | 1| 2| 3|
## +---+---+---+

but this won't work with NaNs.

If you prefer fractions:

exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns]
df.agg(*exprs).show()

## +------------------+------------------+---+
## | x| y| z|
## +------------------+------------------+---+
## |0.3333333333333333|0.6666666666666666|1.0|
## +------------------+------------------+---+

or

# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue
df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show()

## +------------------+------------------+---+
## | x| y| z|
## +------------------+------------------+---+
## |0.3333333333333333|0.6666666666666666|1.0|
## +------------------+------------------+---+

Scala equivalent:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{col, isnan, sum}

type JDouble = java.lang.Double

val df = Seq[(JDouble, JDouble, JDouble, JDouble)](
(0.0, 1, 2, 3.0), (null, 3, 4, 5.0),
(null, null, 6, 7.0), (java.lang.Double.NaN, 8, 9, java.lang.Double.NaN)
).toDF()

def count_not_null(c: Column, nanAsNull: Boolean = false) = {
val pred = c.isNotNull and (if (nanAsNull) not(isnan(c)) else lit(true))
sum(pred.cast("integer"))
}

df.select(df.columns map (c => count_not_null(col(c)).alias(c)): _*).show
// +---+---+---+---+
// | _1| _2| _3| _4|
// +---+---+---+---+
// | 2| 3| 4| 4|
// +---+---+---+---+

df.select(df.columns map (c => count_not_null(col(c), true).alias(c)): _*).show
// +---+---+---+---+
// | _1| _2| _3| _4|
// +---+---+---+---+
// | 1| 3| 4| 3|
// +---+---+---+---+

How to find count of Null and Nan values for each column in a PySpark dataframe efficiently?

You can use method shown here and replace isNull with isnan:

from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
+-------+----------+---+
|session|timestamp1|id2|
+-------+----------+---+
| 0| 0| 3|
+-------+----------+---+

or

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()
+-------+----------+---+
|session|timestamp1|id2|
+-------+----------+---+
| 0| 0| 5|
+-------+----------+---+

Count Non Null values in column in PySpark

The first attempt of yours is filtering out the rows with null in Sales column before you did the aggregation. Thus it is giving you the correct result.

But with the second code

df.groupBy('product') \
.agg((F.count(F.col("Sales").isNotNull()).alias("sales_count"))).show()

You haven't filtered out and did aggregation on whole dataset. If you analyze closely F.col("Sales").isNotNull() would give you boolean columns i.e. true and false. So F.count(F.col("Sales").isNotNull()) is just counting the boolean values in the grouped dataset which is evident if you create a new column as below.

df.withColumn("isNotNull", F.col("Sales").isNotNull()).show()

which would give you

+-----+----------+-------+---------+
|Sales| date|product|isNotNull|
+-----+----------+-------+---------+
| 125|2012-10-10| tv| true|
| 20|2012-10-10| phone| true|
| 40|2012-10-10| tv| true|
| null|2012-10-10| tv| false|
+-----+----------+-------+---------+

So the counts are correct with your second attempt.

For your third attempt, .count() is an action which cannot be used in aggregation transformation. Only functions returning Column dataType can be used in .agg() and they can be inbuilt functions, UDFs or your own functions.

How do I calculate the percentage of None or NaN values in Pyspark?

The following code do exacly what you asked:

from pyspark.sql.functions import *

df:

+----+----+
| A| B|
+----+----+
| 0.4| 0.3|
|null|null|
| 9.7|null|
|null|null|
+----+----+

# Generic solution for all columns
amount_missing_df = df.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in df.columns])
amount_missing_df.show()

amount_missing_df:

+---+----+
| A| B|
+---+----+
|0.5|0.75|
+---+----+

pyspark counting number of nulls per group

You can use groupBy and aggregation function to get required output.

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local").getOrCreate()

# Sample dataframe
in_values = [("A", 1, None),
("A", 1, 20),
("B", None, None),
("A", None, None),
("B", 1, 100)]

in_df = spark.createDataFrame(in_values, "cat string, TS1 int, TS2 int")

columns = in_df.columns
# Ignoring groupBy column and considering cols which are required in aggregation
columns.remove("cat")
agg_expression = [sum(when(in_df[x].isNull(), 1).otherwise(0)).alias(x) for x in columns]

in_df.groupby("cat").agg(*agg_expression).show()

+---+---+---+
|cat|TS1|TS2|
+---+---+---+
| B| 1| 1|
| A| 1| 2|
+---+---+---+

Count the number of non-null values in a Spark DataFrame

Although I like Psidoms answer, often I'm more interested in the fraction of null-values, because just the number of non-null values doesn't tell much...

You can do something like:

import org.apache.spark.sql.functions.{sum,when, count}

df.agg(
(sum(when($"x".isNotNull,0).otherwise(1))/count("*")).as("x : fraction null"),
(sum(when($"y".isNotNull,0).otherwise(1))/count("*")).as("y : fraction null"),
(sum(when($"z".isNotNull,0).otherwise(1))/count("*")).as("z : fraction null")
).show()

EDIT: sum(when($"x".isNotNull,0).otherwise(1)) can also just be replaced by count($"x") which only counts non-null values. As I find this not obvious, I tend to use the sum notation which is more clear

Pyspark - Calculate number of null values in each dataframe column

You can use a list comprehension to loop over all of your columns in the agg, and use alias to rename the output column:

import pyspark.sql.functions as F

df_agg = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])

However, this will return the results in one row as shown below:

df_agg.show()
#+--------+--------+--------+
#|Column_1|Column_2|Column_3|
#+--------+--------+--------+
#| 15| 56| 18|
#+--------+--------+--------+

If you wanted the results in one column instead, you could union each column from df_agg using functools.reduce as follows:

from functools import reduce
df_agg_col = reduce(
lambda a, b: a.union(b),
(
df_agg.select(F.lit(c).alias("Column_Name"), F.col(c).alias("NULL_Count"))
for c in df_agg.columns
)
)
df_agg_col.show()
#+-----------+----------+
#|Column_Name|NULL_Count|
#+-----------+----------+
#| Column_1| 15|
#| Column_2| 56|
#| Column_3| 18|
#+-----------+----------+

Or you can skip the intermediate step of creating df_agg and do:

df_agg_col = reduce(
lambda a, b: a.union(b),
(
df.agg(
F.count(F.when(F.isnull(c), c)).alias('NULL_Count')
).select(F.lit(c).alias("Column_Name"), "NULL_Count")
for c in df.columns
)
)

Pyspark: Need to show a count of null/empty values per each column in a dataframe

you can do the following, just make sure your df is a Spark DataFrame.

from pyspark.sql.functions import col, when

df.select(*(count(when(col(c).isNull(), c)).alias(c) for c in df.columns)).show()


Related Topics



Leave a reply



Submit