How to Select Last Row and Also How to Access Pyspark Dataframe by Index

How to select last row and also how to access PySpark dataframe by index?

How to get the last row.

Long and ugly way which assumes that all columns are oderable:

from pyspark.sql.functions import (
col, max as max_, struct, monotonically_increasing_id
)

last_row = (df
.withColumn("_id", monotonically_increasing_id())
.select(max(struct("_id", *df.columns))
.alias("tmp")).select(col("tmp.*"))
.drop("_id"))

If not all columns can be order you can try:

with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]

with_id.where(col("_id") == i).drop("_id")

Note. There is last function in pyspark.sql.functions/ `o.a.s.sql.functions but considering description of the corresponding expressions it is not a good choice here.

how can I access the dataframe rows by index.like

You cannot. Spark DataFrame and accessible by index. You can add indices using zipWithIndex and filter later. Just keep in mind this O(N) operation.

Is there a way to slice dataframe based on index in pyspark?

Short Answer

If you already have an index column (suppose it was called 'id') you can filter using pyspark.sql.Column.between:

from pyspark.sql.functions import col
df.where(col("id").between(5, 10))

If you don't already have an index column, you can add one yourself and then use the code above. You should have some ordering built in to your data based on some other columns (orderBy("someColumn")).


Full Explanation

No it is not easily possible to slice a Spark DataFrame by index, unless the index is already present as a column.

Spark DataFrames are inherently unordered and do not support random access. (There is no concept of a built-in index as there is in pandas). Each row is treated as an independent collection of structured data, and that is what allows for distributed parallel processing. Thus, any executor can take any chunk of the data and process it without regard for the order of the rows.

Now obviously it is possible to perform operations that do involve ordering (lead, lag, etc), but these will be slower because it requires spark to shuffle data between the executors. (The shuffling of data is typically one of the slowest components of a spark job.)

Related/Futher Reading

  • PySpark DataFrames - way to enumerate without converting to Pandas?
  • PySpark - get row number for each row in a group
  • how to add Row id in pySpark dataframes

How to get the last row from DataFrame?

I'd simply reduce:

df.reduce { (x, y) => 
if (x.getAs[Int]("timestamp") > y.getAs[Int]("timestamp")) x else y
}

How I can get the same result using iloc in Pandas in PySpark?

You can use df.limit(1000) to get 1000 rows from your dataframe. Note that Spark does not have a concept of index, so it will just return 1000 random rows. If you need a particular ordering, you can assign a row number based on a certain column, and filter the row numbers. e.g.

import pyspark.sql.functions as F

df2 = df.withColumn('rn', F.row_number().over(Window.orderBy('col_to_order'))) \
.filter('rn <= 1000')

How do I get the last item from a list using pyspark?

If you're using Spark >= 2.4.0 see jxc's answer below.

In Spark < 2.4.0, dataframes API didn't support -1 indexing on arrays, but you could write your own UDF or use built-in size() function, for example:

>>> from pyspark.sql.functions import size
>>> splitted = df.select(split(df.s, ' ').alias('arr'))
>>> splitted.select(splitted.arr[size(splitted.arr)-1]).show()
+--------------------+
|arr[(size(arr) - 1)]|
+--------------------+
| d|
+--------------------+

subtract two rows in pyspark and append ans as new row

Not as easy as one would have thought because of the need to operate on rows instead of columns. Try the code below where you add one more row using union:

import pyspark.sql.functions as F

result = df.union(
df.agg(
F.lit('result').alias('stat'),
*[
(
F.max(F.when(F.col('stat') == '75%', F.col(c))) -
F.max(F.when(F.col('stat') == '50%', F.col(c)))
).alias(c)
for c in df.columns[1:]
]
)
)

result.show()
+------+-----+-----+------+-----+
| stat|col_A|col_B| col_C|col_D|
+------+-----+-----+------+-----+
| count| 14| 14| 1414| 14|
| 75%| 4| 4001|160987| 49|
| 50%| 3| 3657|131225| 38|
|result| 1| 344| 29762| 11|
+------+-----+-----+------+-----+

update value in specific row by checking condition for another row, pyspark

You can also select the last index's Bool value into a variable and use it in when expression like this:

from pyspark.sql import functions as F

# or actually, if the index is always = 4 you can just filter without ordering
last_bool = df.orderBy(F.desc("index")).limit(1).select("Bool").first().Bool

df2 = df.withColumn(
'New_Bool',
F.when(
F.col('index') == 1,
F.lit(last_bool) & F.col("New_bool") # keep New_bool or update to false if last is false
).otherwise(
F.col("New_bool") & F.col("Bool") # keep New_bool or update to false if Bool is false
)
)

df2.show()
# +-----+-----+--------+
# |index| Bool|New_Bool|
# +-----+-----+--------+
# | 1| true| false|
# | 2| true| true|
# | 3|false| false|
# | 4|false| false|
# +-----+-----+--------+


Related Topics



Leave a reply



Submit