How to Get a SQL Row_Number Equivalent for a Spark Rdd

How do I get a SQL row_number equivalent for a Spark RDD?

The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.

Create a test DataFrame:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()

Add the partitioned row number:

from pyspark.sql.window import Window

(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)

+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+

SPARK code for sql case statement and row_number equivalent

You can use RDD.zipWithIndex, then convert it to a DataFrame, then use min() and join to get the results you want.

Like this:

import org.apache.spark.sql._
import org.apache.spark.sql.types._

// SORT BY added as per comment request
val test = sc.textFile("/user/hadoop/test.txt")
.sortBy(_.split(",")(2)).sortBy(_.split(",")(3).toInt)

// Table to hold the dept name lookups
val deptDF =
sc.parallelize(Array(("EDept1", "IT"),("EDept2", "ComSc"),("EDept3", "Mech")))
.toDF("deptCode", "dept")

val schema = StructType(Array(
StructField("col1", StringType, false),
StructField("col2", StringType, false),
StructField("col3", StringType, false),
StructField("col4", StringType, false),
StructField("col5", LongType, false))
)

// join to deptDF added as per comment
val testDF = sqlContext.createDataFrame(
test.zipWithIndex.map(tuple => Row.fromSeq(tuple._1.split(",") ++ Array(tuple._2))),
schema
)
.join(deptDF, $"col3" === $"deptCode")
.select($"col1", $"col2", $"dept" as "col3", $"col4", $"col5")
.orderBy($"col5")

testDF.show

col1 col2 col3 col4 col5
Eid1 EName1 IT 100 0
Eid3 EName3 IT 101 1
Eid2 EName2 IT 102 2
Eid4 EName4 ComSc 110 3
Eid5 EName5 ComSc 121 4
Eid6 EName6 Mech 99 5

val result = testDF.join(
testDF.groupBy($"col3").agg($"col3" as "g_col3", min($"col5") as "start"),
$"col3" === $"g_col3"
)
.select($"col1", $"col2", $"col3", $"col4", $"col5" - $"start" + 1 as "index")

result.show

col1 col2 col3 col4 index
Eid4 EName4 ComSc 110 1
Eid5 EName5 ComSc 121 2
Eid6 EName6 Mech 99 1
Eid1 EName1 IT 100 1
Eid3 EName3 IT 101 2
Eid2 EName2 IT 102 3

Spark SQL Row_number() PartitionBy Sort Desc

desc should be applied on a column not a window definition. You can use either a method on a column:

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

F.row_number().over(
Window.partitionBy("driver").orderBy(col("unit_count").desc())
)

or a standalone function:

from pyspark.sql.functions import desc
from pyspark.sql.window import Window

F.row_number().over(
Window.partitionBy("driver").orderBy(desc("unit_count"))
)

PySpark - get row number for each row in a group

Use window function:

from pyspark.sql.window import *
from pyspark.sql.functions import row_number

df.withColumn("row_num", row_number().over(Window.partitionBy("Group").orderBy("Date")))

Filter RDD based on row_number

Don't worry about loading the rows/lines you don't need. When you do:

input = sc.textFile(inputFile)

you are not loading the file. You are just getting an object that will allow you to operate on the file. So to be efficient, it is better to think in terms of getting only what you want. For example:

header = input.take(1)[0]
rows = input.filter(lambda line: line != header)

Note that here I am not using an index to refer to the line I want to drop but rather its value. This has the side effect that other lines with this value will also be ignored but is more in the spirit of Spark as Spark will distribute your text file in different parts across the nodes and the concept of line numbers gets lost in each partition. This is also the reason why this is not easy to do in Spark(Hadoop) as each partition should be considered independent and a global line number would break this assumption.

If you really need to work with line numbers I recommend that you add them to the file outside of Spark(see here) and then just filter by this column inside of Spark.

Edit: Added zipWithIndex solution as suggested by @Daniel Darabos.

sc.textFile('test.txt')\
.zipWithIndex()\ # [(u'First', 0), (u'Second', 1), ...
.filter(lambda x: x[1]!=5)\ # select columns
.map(lambda x: x[0])\ # [u'First', u'Second'
.collect()

Difference in dense rank and row number in spark

The difference is when there are "ties" in the ordering column. Check the example below:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = Seq(("a", 10), ("a", 10), ("a", 20)).toDF("col1", "col2")

val windowSpec = Window.partitionBy("col1").orderBy("col2")

df
.withColumn("rank", rank().over(windowSpec))
.withColumn("dense_rank", dense_rank().over(windowSpec))
.withColumn("row_number", row_number().over(windowSpec)).show

+----+----+----+----------+----------+
|col1|col2|rank|dense_rank|row_number|
+----+----+----+----------+----------+
| a| 10| 1| 1| 1|
| a| 10| 1| 1| 2|
| a| 20| 3| 2| 3|
+----+----+----+----------+----------+

Note that the value "10" exists twice in col2 within the same window (col1 = "a"). That's when you see a difference between the three functions.

How to get nth row of Spark RDD?

I don't know how much it is efficient, as it depends on the current and future optimizations in the Spark's engine, but you can try doing the following:

rdd.zipWithIndex.filter(_._2==9).map(_._1).first()

The first function transforms the RDD into a pair (value, idx) with idx going from 0 onwards. The second function takes the element with idx==9 (the 10th). The third function takes the original value. Then the result is returned.

The first function could be pulled up by the execution engine and influence the behavior of the whole processing. Give it a try.

In any case, if n is very large, this method is efficient in that it does not require to collect an array of the first n elements in the driver node.

How to rollup rows in an RDD based on column and get sorted values

you can use sparkContext's textFile function to read as rdd and then do the necessary parsing and follow the same footsteps as you have followed.

val rdd = sc.textFile("path to the csv file")

val result = rdd.zipWithIndex().map { case (line, index) => {
val array = line.split(",").map(_.trim)
(array(0), array(1), index)
}
}.groupBy(_._1)
.mapValues(x => x.toList.sortBy(_._3).map(_._2))
.collectAsMap()

This should give you your desired output as result is

Map(key2 -> List(val1, val2, val3), key1 -> List(val1, val2))


Related Topics



Leave a reply



Submit