Using Monotonically_Increasing_Id() for Assigning Row Number to Pyspark Dataframe

Using monotonically_increasing_id() for assigning row number to pyspark dataframe

Edit: Full examples of the ways to do this and the risks can be found here

From the documentation

A column that generates monotonically increasing 64-bit integers.

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.

Thus, it is not like an auto-increment id in RDBs and it is not reliable for merging.

If you need an auto-increment behavior like in RDBs and your data is sortable, then you can use row_number

df.createOrReplaceTempView('df')
spark.sql('select row_number() over (order by "some_column") as num, * from df')
+---+-----------+
|num|some_column|
+---+-----------+
| 1| ....... |
| 2| ....... |
| 3| ..........|
+---+-----------+

If your data is not sortable and you don't mind using rdds to create the indexes and then fall back to dataframes, you can use rdd.zipWithIndex()

An example can be found here

In short:

# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex()
df = df.rdd.zipWithIndex()
# return back to dataframe
df = df.toDF()

df.show()

# your data | indexes
+---------------------+---+
| _1 | _2|
+-----------=---------+---+
|[data col1,data col2]| 0|
|[data col1,data col2]| 1|
|[data col1,data col2]| 2|
+---------------------+---+

You will probably need some more transformations after that to get your dataframe to what you need it to be. Note: not a very performant solution.

Hope this helps. Good luck!

Edit:
Come to think about it, you can combine the monotonically_increasing_id to use the row_number:

# create a monotonically increasing id 
df = df.withColumn("idx", monotonically_increasing_id())

# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df.createOrReplaceTempView('df')
new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')

Not sure about performance though.

How do I add an persistent column of row ids to Spark DataFrame?

Spark 2.0

  • This is issue has been resolved in Spark 2.0 with SPARK-14241.

  • Another similar issue has been resolved in Spark 2.1 with SPARK-14393

Spark 1.x

Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.

It doesn't take any parameters so from an optimizer perspective it doesn't matter when it is called and can be pushed after all other operations. Hence the behavior you see.

If you take look at the code you'll find out this is explicitly marked by extending MonotonicallyIncreasingID expression with Nondeterministic.


I don't think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:

from pyspark.sql.types import LongType
from pyspark.sql.functions import udf

bound = udf(lambda _, v: v, LongType())

(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))

In general it could be cleaner to add indices using zipWithIndex on a RDD and then convert it back to a DataFrame.


* Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.

why adding a new id column with monotonically increasing id break after 352

As per mic4ael answer the generated ID is only guaranteed to be monotonically increasing and unique, but not consecutive. You can generate consecutive and monotonically increasing ids using row_number, however, this approach does not scale very well and should be avoided for larger datasets. For example, taking this as the input data:

from pyspark.sql.window import Window
from pyspark.sql.functions import col,row_number
df = spark.createDataFrame([('33004', ''),('33004', ''),('33010', 'Muxia'), ('33020','Fuensanta'),('33020','Fuensanta')], ("Zip", "PostalRegion"))

You can add a sequential ID column using the following approach:

from pyspark.sql.window import Window
w = Window().orderBy("PostalRegion")
df = df.select(row_number().over(w).alias("ID"), col("*"))
df.show()

This gives as output:

+---+-----+------------+
| ID| Zip|PostalRegion|
+---+-----+------------+
| 1|33004| |
| 2|33004| |
| 3|33020| Fuensanta|
| 4|33020| Fuensanta|
| 5|33010| Muxia|
+---+-----+------------+

Creating a row number of each row in PySpark DataFrame using row_number() function with Spark version 2.2

You should define column for order clause. If you don't need to order values then write a dummy value. Try below;

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("row_num", row_number().over(w))

Spark Dataset unique id performance - row_number vs monotonically_increasing_id

monotically_increasing_id is distributed which performs according to partition of the data.

whereas

row_number() using Window function without partitionBy (as in your case) is not distributed. When we don't define partitionBy, all the data are sent to one executor for generating row number.

Thus, it is certain that monotically_increasing_id() will perform better than row_number() without partitionBy defined.

How to create a column with row number inf pyspark

Use row_number() function by ordering to monotonically_increasing_id()

from pyspark.sql.functions import *
from pyspark.sql import *
w=Window.orderBy("mid")

top_seller_elast_df = top_seller_elast_df.withColumn("mid", monotonically_increasing_id())

top_seller_elast_df.withColumn("row_number",row_number().over(w)).show()

adding a unique consecutive row number to dataframe in pyspark

I have found a solution and it's very simple.
since I have no column in my dataframe which is having same value across all the rows, so using row_number is not generating unique row numbers when using it with partitionBy clause.

Lets add a new column to the existing dataframe with some default value in it.

emp_df= emp_df.withColumn("new_column",lit("ABC"))

and create a window function with paritionBy using that column "new_column"

w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")

you will get the desired results:

+------+--------------------+--------+----------+-------+
|emp_id| emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
| 1|VIKRANT SINGH RAN...|NOIDA | 10000| 1|
| 2|RAGHVENDRA KUMAR ...|GURGAON | 50000| 2|
| 7|AJAY SHARMA ...|NOIDA | 70000| 3|
| 9|ROBERT ...|GURGAON | 70000| 4|
| 4|ABHIJAN SINHA ...|SAKET | 65000| 5|
| 8|SIDDHARTH BASU ...|SAKET | 72000| 6|
| 5|SUPER DEVELOPER ...|USA | 50000| 7|
| 3|GOVIND NIMBHAL ...|DWARKA | 92000| 8|
| 6|RAJAT TYAGI ...|UP | 65000| 9|
+------+--------------------+--------+----------+-------+


Related Topics



Leave a reply



Submit