How do I get a SQL row_number equivalent for a Spark RDD?
The row_number() over (partition by ... order by ...)
functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
Create a test DataFrame:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Add the partitioned row number:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
SPARK code for sql case statement and row_number equivalent
You can use RDD.zipWithIndex, then convert it to a DataFrame, then use min() and join to get the results you want.
Like this:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// SORT BY added as per comment request
val test = sc.textFile("/user/hadoop/test.txt")
.sortBy(_.split(",")(2)).sortBy(_.split(",")(3).toInt)
// Table to hold the dept name lookups
val deptDF =
sc.parallelize(Array(("EDept1", "IT"),("EDept2", "ComSc"),("EDept3", "Mech")))
.toDF("deptCode", "dept")
val schema = StructType(Array(
StructField("col1", StringType, false),
StructField("col2", StringType, false),
StructField("col3", StringType, false),
StructField("col4", StringType, false),
StructField("col5", LongType, false))
)
// join to deptDF added as per comment
val testDF = sqlContext.createDataFrame(
test.zipWithIndex.map(tuple => Row.fromSeq(tuple._1.split(",") ++ Array(tuple._2))),
schema
)
.join(deptDF, $"col3" === $"deptCode")
.select($"col1", $"col2", $"dept" as "col3", $"col4", $"col5")
.orderBy($"col5")
testDF.show
col1 col2 col3 col4 col5
Eid1 EName1 IT 100 0
Eid3 EName3 IT 101 1
Eid2 EName2 IT 102 2
Eid4 EName4 ComSc 110 3
Eid5 EName5 ComSc 121 4
Eid6 EName6 Mech 99 5
val result = testDF.join(
testDF.groupBy($"col3").agg($"col3" as "g_col3", min($"col5") as "start"),
$"col3" === $"g_col3"
)
.select($"col1", $"col2", $"col3", $"col4", $"col5" - $"start" + 1 as "index")
result.show
col1 col2 col3 col4 index
Eid4 EName4 ComSc 110 1
Eid5 EName5 ComSc 121 2
Eid6 EName6 Mech 99 1
Eid1 EName1 IT 100 1
Eid3 EName3 IT 101 2
Eid2 EName2 IT 102 3
Spark SQL Row_number() PartitionBy Sort Desc
desc
should be applied on a column not a window definition. You can use either a method on a column:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
F.row_number().over(
Window.partitionBy("driver").orderBy(col("unit_count").desc())
)
or a standalone function:
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
F.row_number().over(
Window.partitionBy("driver").orderBy(desc("unit_count"))
)
PySpark - get row number for each row in a group
Use window function:
from pyspark.sql.window import *
from pyspark.sql.functions import row_number
df.withColumn("row_num", row_number().over(Window.partitionBy("Group").orderBy("Date")))
Filter RDD based on row_number
Don't worry about loading the rows/lines you don't need. When you do:
input = sc.textFile(inputFile)
you are not loading the file. You are just getting an object that will allow you to operate on the file. So to be efficient, it is better to think in terms of getting only what you want. For example:
header = input.take(1)[0]
rows = input.filter(lambda line: line != header)
Note that here I am not using an index to refer to the line I want to drop but rather its value. This has the side effect that other lines with this value will also be ignored but is more in the spirit of Spark as Spark will distribute your text file in different parts across the nodes and the concept of line numbers gets lost in each partition. This is also the reason why this is not easy to do in Spark(Hadoop) as each partition should be considered independent and a global line number would break this assumption.
If you really need to work with line numbers I recommend that you add them to the file outside of Spark(see here) and then just filter by this column inside of Spark.
Edit: Added zipWithIndex
solution as suggested by @Daniel Darabos.
sc.textFile('test.txt')\
.zipWithIndex()\ # [(u'First', 0), (u'Second', 1), ...
.filter(lambda x: x[1]!=5)\ # select columns
.map(lambda x: x[0])\ # [u'First', u'Second'
.collect()
Difference in dense rank and row number in spark
The difference is when there are "ties" in the ordering column. Check the example below:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df = Seq(("a", 10), ("a", 10), ("a", 20)).toDF("col1", "col2")
val windowSpec = Window.partitionBy("col1").orderBy("col2")
df
.withColumn("rank", rank().over(windowSpec))
.withColumn("dense_rank", dense_rank().over(windowSpec))
.withColumn("row_number", row_number().over(windowSpec)).show
+----+----+----+----------+----------+
|col1|col2|rank|dense_rank|row_number|
+----+----+----+----------+----------+
| a| 10| 1| 1| 1|
| a| 10| 1| 1| 2|
| a| 20| 3| 2| 3|
+----+----+----+----------+----------+
Note that the value "10" exists twice in col2
within the same window (col1 = "a"
). That's when you see a difference between the three functions.
How to get nth row of Spark RDD?
I don't know how much it is efficient, as it depends on the current and future optimizations in the Spark's engine, but you can try doing the following:
rdd.zipWithIndex.filter(_._2==9).map(_._1).first()
The first function transforms the RDD into a pair (value, idx) with idx going from 0 onwards. The second function takes the element with idx==9 (the 10th). The third function takes the original value. Then the result is returned.
The first function could be pulled up by the execution engine and influence the behavior of the whole processing. Give it a try.
In any case, if n is very large, this method is efficient in that it does not require to collect an array of the first n elements in the driver node.
How to rollup rows in an RDD based on column and get sorted values
you can use sparkContext
's textFile
function to read as rdd
and then do the necessary parsing and follow the same footsteps as you have followed.
val rdd = sc.textFile("path to the csv file")
val result = rdd.zipWithIndex().map { case (line, index) => {
val array = line.split(",").map(_.trim)
(array(0), array(1), index)
}
}.groupBy(_._1)
.mapValues(x => x.toList.sortBy(_._3).map(_._2))
.collectAsMap()
This should give you your desired output as result
is
Map(key2 -> List(val1, val2, val3), key1 -> List(val1, val2))
Related Topics
"You Tried to Execute a Query That Does Not Include the Specified Aggregate Function"
How to Populate Calendar Table in Oracle
How to Debug Ora-01775: Looping Chain of Synonyms
SQL Speed Up Performance of Insert
Why Doesn't SQL Support "= Null" Instead of "Is Null"
How to Backup a Remote SQL Server Database to a Local Drive
Composite Primary Keys:Good or Bad
Ad Hoc Queries VS Stored Procedures VS Dynamic SQL
SQL Server: Cannot Insert an Explicit Value into a Timestamp Column
Regular Expression to Match Common SQL Syntax
Return Multiple Columns and Rows from a Function Postgresql Instead of Record
How to Alter the Position of a Column in a Postgresql Database Table
Get Month Name from Date in Oracle
Is There a Product Function Like There Is a Sum Function in Oracle SQL
MySQL #1140 - Mixing of Group Columns