Spark Dataframe Groupping Does Not Count Nulls

spark dataframe groupping does not count nulls

Yes, count applied to a specific column does not count the null-values. If you want to include the null-values, use:

df.groupBy('a).agg(count("*")).show

pyspark counting number of nulls per group

You can use groupBy and aggregation function to get required output.

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local").getOrCreate()

# Sample dataframe
in_values = [("A", 1, None),
("A", 1, 20),
("B", None, None),
("A", None, None),
("B", 1, 100)]

in_df = spark.createDataFrame(in_values, "cat string, TS1 int, TS2 int")

columns = in_df.columns
# Ignoring groupBy column and considering cols which are required in aggregation
columns.remove("cat")
agg_expression = [sum(when(in_df[x].isNull(), 1).otherwise(0)).alias(x) for x in columns]

in_df.groupby("cat").agg(*agg_expression).show()

+---+---+---+
|cat|TS1|TS2|
+---+---+---+
| B| 1| 1|
| A| 1| 2|
+---+---+---+

PySpark Dataframe Groupby and Count Null Values

You can just use the same logic and add a group by. Note that I also remove "year" from the aggregated columns, but that's optionnal (you would get two 'year' columns).

columns = filter(lambda x: x != "year", df.columns)
df.groupBy("year")\
.agg(*(sum(col(c).isNull().cast("int")).alias(c) for c in columns))\
.show()

Count Non Null values in column in PySpark

The first attempt of yours is filtering out the rows with null in Sales column before you did the aggregation. Thus it is giving you the correct result.

But with the second code

df.groupBy('product') \
.agg((F.count(F.col("Sales").isNotNull()).alias("sales_count"))).show()

You haven't filtered out and did aggregation on whole dataset. If you analyze closely F.col("Sales").isNotNull() would give you boolean columns i.e. true and false. So F.count(F.col("Sales").isNotNull()) is just counting the boolean values in the grouped dataset which is evident if you create a new column as below.

df.withColumn("isNotNull", F.col("Sales").isNotNull()).show()

which would give you

+-----+----------+-------+---------+
|Sales| date|product|isNotNull|
+-----+----------+-------+---------+
| 125|2012-10-10| tv| true|
| 20|2012-10-10| phone| true|
| 40|2012-10-10| tv| true|
| null|2012-10-10| tv| false|
+-----+----------+-------+---------+

So the counts are correct with your second attempt.

For your third attempt, .count() is an action which cannot be used in aggregation transformation. Only functions returning Column dataType can be used in .agg() and they can be inbuilt functions, UDFs or your own functions.

How to find count of Null and Nan values for each column in a PySpark dataframe efficiently?

You can use method shown here and replace isNull with isnan:

from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
+-------+----------+---+
|session|timestamp1|id2|
+-------+----------+---+
| 0| 0| 3|
+-------+----------+---+

or

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()
+-------+----------+---+
|session|timestamp1|id2|
+-------+----------+---+
| 0| 0| 5|
+-------+----------+---+

Get first non-null values in group by (Spark 1.6)

For Spark 1.3 - 1.5, this could do the trick:

from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()

+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
| a| code1| name2|
+---+-----------+-----------+

Edit

Apparently, in version 1.6 they have changed the way the first aggregate function is processed. Now, the underlying class First should be constructed with a second argument ignoreNullsExpr parameter, which is not yet used by the first aggregate function (as can bee seen here). However, in Spark 2.0 it will be able to call agg(F.first(col, True)) to ignore nulls (as can be checked here).

Therefore, for Spark 1.6 the approach must be different and a little more inefficient, unfornately. One idea is the following:

from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()

+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
| a| code1| name2|
+---+-------------+-------------+

Maybe there is a better option. I'll edit the answer if I find one.

Counting total rows, rows with null value, rows with zero values, and their ratios on PySpark

data= [
('abc_xxx', 1, 200, None, 100),
('def_xxx', 1, 0, 3, 40 ),
('ghi_xxx', 2, 300, 1, 2 ),
]

df = spark.createDataFrame(data, ['unique_id','group','value_1','value_2','value_3'])
# new edit
df = df\
.withColumn('contains_null', when(isnull(col('value_1')) | isnull(col('value_2')) | isnull(col('value_3')), lit(1)).otherwise(lit(0)))\
.withColumn('contains_zero', when((col('value_1')==0) | (col('value_2')==0) | (col('value_3')==0), lit(1)).otherwise(lit(0)))

df.groupBy('group')\
.agg(count('unique_id').alias('total_rows'), sum('contains_null').alias('null_value_rows'), sum('contains_zero').alias('zero_value_rows')).show()

+-----+----------+---------------+---------------+
|group|total_rows|null_value_rows|zero_value_rows|
+-----+----------+---------------+---------------+
| 1| 2| 1| 1|
| 2| 1| 0| 0|
+-----+----------+---------------+---------------+

# total_count = (count('value_1') + count('value_2') + count('value_3'))
# null_count = (sum(when(isnull(col('value_1')), lit(1)).otherwise(lit(0)) + when(isnull(col('value_2')), lit(1)).otherwise(lit(0)) + when(isnull(col('value_3')), lit(1)).otherwise(lit(0))))
# zero_count = (sum(when(col('value_1')==0, lit(1)).otherwise(lit(0)) + when(col('value_2')==0, lit(1)).otherwise(lit(0)) + when(col('value_3')==0, lit(1)).otherwise(lit(0))))

# df.groupBy('group')\
# .agg(total_count.alias('total_numbers'), null_count.alias('null_values'), zero_count.alias('zero_values')).show()

#+-----+-------------+-----------+-----------+
#|group|total_numbers|null_values|zero_values|
#+-----+-------------+-----------+-----------+
#| 1| 5| 1| 1|
#| 2| 3| 0| 0|
#+-----+-------------+-----------+-----------+

get dataframe of groupby where all column entries are null

Let's try this with some Window functions:

from functools import reduce

from pyspark.sql import functions as F, Window as W

exclude_cols = ["cat1", "col1"]

df = reduce(
lambda a, b: a.withColumn(b["colName"], b["col"]),
[
{
"colName": f"{col}_grp",
"col": F.max(F.when(F.col(col).isNotNull(), 1).otherwise(0)).over(
W.partitionBy("cat1")
),
}
for col in df.columns
if col not in exclude_cols
],
df,
)

df.show()
+----+----+----+----+----+--------+--------+--------+
|cat1|col1|col2|col3|col4|col2_grp|col3_grp|col4_grp|
+----+----+----+----+----+--------+--------+--------+
| 1| 1|null|null|null| 0| 0| 0|
| 1| 2|null|null|null| 0| 0| 0|
| 1| 3|null|null|null| 0| 0| 0|
| 3| 1|null|10.0|null| 0| 1| 1|
| 3| 2|null| 2.0| 2| 0| 1| 1|
| 3| 3|null|20.0| 4| 0| 1| 1|
| 2| 2| 60| 0.3| 6| 1| 1| 1|
| 2| 1| 50| 0.3| 2| 1| 1| 1|
+----+----+----+----+----+--------+--------+--------+

from this dataframe, you can select the lines you need with a simple where :

# first dataframe 
df.where(
F.greatest(*(F.col(col) for col in df.columns if col.endswith("_grp"))) == 0
).show()
+----+----+----+----+----+--------+--------+--------+
|cat1|col1|col2|col3|col4|col2_grp|col3_grp|col4_grp|
+----+----+----+----+----+--------+--------+--------+
| 1| 1|null|null|null| 0| 0| 0|
| 1| 2|null|null|null| 0| 0| 0|
| 1| 3|null|null|null| 0| 0| 0|
+----+----+----+----+----+--------+--------+--------+

# second one (which theoretically should include ID 1 also)
df.where(
F.least(*(F.col(col) for col in df.columns if col.endswith("_grp"))) == 0
).show()
+----+----+----+----+----+--------+--------+--------+
|cat1|col1|col2|col3|col4|col2_grp|col3_grp|col4_grp|
+----+----+----+----+----+--------+--------+--------+
| 1| 1|null|null|null| 0| 0| 0|
| 1| 2|null|null|null| 0| 0| 0|
| 1| 3|null|null|null| 0| 0| 0|
| 3| 1|null|10.0|null| 0| 1| 1|
| 3| 2|null| 2.0| 2| 0| 1| 1|
| 3| 3|null|20.0| 4| 0| 1| 1|
+----+----+----+----+----+--------+--------+--------+

Consider Non-Null value while performing groupBy operation using Spark-SQL

Here is the SQL version of answer posted by @Robert Kossendey

   df.createOrReplaceTempView("tab")
spark.sql("select name, first(value_1, true), first(value_2, true) from tab group by name")


Related Topics



Leave a reply



Submit