Using monotonically_increasing_id() for assigning row number to pyspark dataframe
Edit: Full examples of the ways to do this and the risks can be found here
From the documentation
A column that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.
Thus, it is not like an auto-increment id in RDBs and it is not reliable for merging.
If you need an auto-increment behavior like in RDBs and your data is sortable, then you can use row_number
df.createOrReplaceTempView('df')
spark.sql('select row_number() over (order by "some_column") as num, * from df')
+---+-----------+
|num|some_column|
+---+-----------+
| 1| ....... |
| 2| ....... |
| 3| ..........|
+---+-----------+
If your data is not sortable and you don't mind using rdds to create the indexes and then fall back to dataframes, you can use rdd.zipWithIndex()
An example can be found here
In short:
# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex()
df = df.rdd.zipWithIndex()
# return back to dataframe
df = df.toDF()
df.show()
# your data | indexes
+---------------------+---+
| _1 | _2|
+-----------=---------+---+
|[data col1,data col2]| 0|
|[data col1,data col2]| 1|
|[data col1,data col2]| 2|
+---------------------+---+
You will probably need some more transformations after that to get your dataframe to what you need it to be. Note: not a very performant solution.
Hope this helps. Good luck!
Edit:
Come to think about it, you can combine the monotonically_increasing_id
to use the row_number
:
# create a monotonically increasing id
df = df.withColumn("idx", monotonically_increasing_id())
# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df.createOrReplaceTempView('df')
new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')
Not sure about performance though.
How do I add an persistent column of row ids to Spark DataFrame?
Spark 2.0
This is issue has been resolved in Spark 2.0 with SPARK-14241.
Another similar issue has been resolved in Spark 2.1 with SPARK-14393
Spark 1.x
Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id
is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.
It doesn't take any parameters so from an optimizer perspective it doesn't matter when it is called and can be pushed after all other operations. Hence the behavior you see.
If you take look at the code you'll find out this is explicitly marked by extending MonotonicallyIncreasingID
expression with Nondeterministic
.
I don't think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
bound = udf(lambda _, v: v, LongType())
(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))
In general it could be cleaner to add indices using zipWithIndex
on a RDD
and then convert it back to a DataFrame
.
* Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.
why adding a new id column with monotonically increasing id break after 352
As per mic4ael answer the generated ID is only guaranteed to be monotonically increasing and unique, but not consecutive. You can generate consecutive and monotonically increasing ids using row_number
, however, this approach does not scale very well and should be avoided for larger datasets. For example, taking this as the input data:
from pyspark.sql.window import Window
from pyspark.sql.functions import col,row_number
df = spark.createDataFrame([('33004', ''),('33004', ''),('33010', 'Muxia'), ('33020','Fuensanta'),('33020','Fuensanta')], ("Zip", "PostalRegion"))
You can add a sequential ID
column using the following approach:
from pyspark.sql.window import Window
w = Window().orderBy("PostalRegion")
df = df.select(row_number().over(w).alias("ID"), col("*"))
df.show()
This gives as output:
+---+-----+------------+
| ID| Zip|PostalRegion|
+---+-----+------------+
| 1|33004| |
| 2|33004| |
| 3|33020| Fuensanta|
| 4|33020| Fuensanta|
| 5|33010| Muxia|
+---+-----+------------+
Creating a row number of each row in PySpark DataFrame using row_number() function with Spark version 2.2
You should define column for order clause. If you don't need to order values then write a dummy value. Try below;
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("row_num", row_number().over(w))
Spark Dataset unique id performance - row_number vs monotonically_increasing_id
monotically_increasing_id
is distributed which performs according to partition of the data.
whereas
row_number()
using Window
function without partitionBy
(as in your case) is not distributed. When we don't define partitionBy
, all the data are sent to one executor for generating row number.
Thus, it is certain that monotically_increasing_id()
will perform better than row_number()
without partitionBy
defined.
How to create a column with row number inf pyspark
Use row_number()
function by ordering to monotonically_increasing_id()
from pyspark.sql.functions import *
from pyspark.sql import *
w=Window.orderBy("mid")
top_seller_elast_df = top_seller_elast_df.withColumn("mid", monotonically_increasing_id())
top_seller_elast_df.withColumn("row_number",row_number().over(w)).show()
adding a unique consecutive row number to dataframe in pyspark
I have found a solution and it's very simple.
since I have no column in my dataframe which is having same value across all the rows, so using row_number is not generating unique row numbers when using it with partitionBy clause.
Lets add a new column to the existing dataframe with some default value in it.
emp_df= emp_df.withColumn("new_column",lit("ABC"))
and create a window function with paritionBy using that column "new_column"
w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")
you will get the desired results:
+------+--------------------+--------+----------+-------+
|emp_id| emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
| 1|VIKRANT SINGH RAN...|NOIDA | 10000| 1|
| 2|RAGHVENDRA KUMAR ...|GURGAON | 50000| 2|
| 7|AJAY SHARMA ...|NOIDA | 70000| 3|
| 9|ROBERT ...|GURGAON | 70000| 4|
| 4|ABHIJAN SINHA ...|SAKET | 65000| 5|
| 8|SIDDHARTH BASU ...|SAKET | 72000| 6|
| 5|SUPER DEVELOPER ...|USA | 50000| 7|
| 3|GOVIND NIMBHAL ...|DWARKA | 92000| 8|
| 6|RAJAT TYAGI ...|UP | 65000| 9|
+------+--------------------+--------+----------+-------+
Related Topics
Nameerror: Global Name 'Xrange' Is Not Defined in Python 3
Remove or Replace Spaces in Column Names
Can't Open Lib 'Odbc Driver 13 for SQL Server'? Sym Linking Issue
How to Get Md5 Sum of a String Using Python
How to Share Numpy Random State of a Parent Process with Child Processes
How to Use MySQLdb with Python and Django in Osx 10.6
How to Understand the Output of Dis.Dis
Numpy to Tfrecords: Is There a More Simple Way to Handle Batch Inputs from Tfrecords
Set Up Python Simplehttpserver on Windows
Efficiently Updating Database Using SQLalchemy Orm
Find the Indexes of All Regex Matches
Inserting the Same Value Multiple Times When Formatting a String
How to Get the Executable's Current Directory in Py2Exe
Could Not Find a Version That Satisfies the Requirement <Package>
Python Regular Expression Pattern * Is Not Working as Expected