Pyspark Dataframes - Way to Enumerate Without Converting to Pandas

PySpark DataFrames - way to enumerate without converting to Pandas?

It doesn't work because:

  1. the second argument for withColumn should be a Column not a collection. np.array won't work here
  2. when you pass "index in indexes" as a SQL expression to where indexes is out of scope and it is not resolved as a valid identifier

PySpark >= 1.4.0

You can add row numbers using respective window function and query using Column.isin method or properly formated query string:

from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window

w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))

# Using DSL
indexed.where(col("index").isin(set(indexes)))

# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))

It looks like window functions called without PARTITION BY clause move all data to the single partition so above may be not the best solution after all.

Any faster and simpler way to deal with it?

Not really. Spark DataFrames don't support random row access.

PairedRDD can be accessed using lookup method which is relatively fast if data is partitioned using HashPartitioner. There is also indexed-rdd project which supports efficient lookups.

Edit:

Independent of PySpark version you can try something like this:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

row = Row("char")
row_with_index = Row("char", "index")

df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)

## +----+
## |char|
## +----+
## | a|
## | b|
## | c|
## | d|
## | e|
## +----+
## only showing top 5 rows

# This part is not tested but should work and save some work later
schema = StructType(
df.schema.fields[:] + [StructField("index", LongType(), False)])

indexed = (df.rdd # Extract rdd
.zipWithIndex() # Add index
.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
.toDF(schema)) # It will work without schema but will be more expensive

# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))

Extract values from list within a spark dataframe without convert to pandas

you can for example crate a new column like this by picking an element from the list on another column by index.

from pyspark.sql import functions as F
from pyspark.sql import DataFrame
df = DataFrame()
df = df.withColumn("selected_item", F.col("sentence").getItem(0))```

Converting dataframe to dictionary in pyspark without using pandas

df2 is the dataframe from the previous post. You can do a pivot first, and then convert to dictionary as described in your linked post.

import pyspark.sql.functions as F

df3 = df2.groupBy('age').pivot('siblings').agg(F.first('count'))
list_persons = [row.asDict() for row in df3.collect()]
dict_persons = {person['age']: person for person in list_persons}

{15: {'age': 15, '0': 1.0, '1': None, '3': None}, 10: {'age': 10, '0': None, '1': None, '3': 1.0}, 14: {'age': 14, '0': None, '1': 1.0, '3': None}}

Or another way:

df4 = df3.fillna(float('nan')).groupBy().pivot('age').agg(F.first(F.struct(*df3.columns[1:])))
result_dict = eval(df4.select(F.to_json(F.struct(*df4.columns))).head()[0])

{'10': {'0': 'NaN', '1': 'NaN', '3': 1.0}, '14': {'0': 'NaN', '1': 1.0, '3': 'NaN'}, '15': {'0': 1.0, '1': 'NaN', '3': 'NaN'}}

Is there a way to slice dataframe based on index in pyspark?

Short Answer

If you already have an index column (suppose it was called 'id') you can filter using pyspark.sql.Column.between:

from pyspark.sql.functions import col
df.where(col("id").between(5, 10))

If you don't already have an index column, you can add one yourself and then use the code above. You should have some ordering built in to your data based on some other columns (orderBy("someColumn")).


Full Explanation

No it is not easily possible to slice a Spark DataFrame by index, unless the index is already present as a column.

Spark DataFrames are inherently unordered and do not support random access. (There is no concept of a built-in index as there is in pandas). Each row is treated as an independent collection of structured data, and that is what allows for distributed parallel processing. Thus, any executor can take any chunk of the data and process it without regard for the order of the rows.

Now obviously it is possible to perform operations that do involve ordering (lead, lag, etc), but these will be slower because it requires spark to shuffle data between the executors. (The shuffling of data is typically one of the slowest components of a spark job.)

Related/Futher Reading

  • PySpark DataFrames - way to enumerate without converting to Pandas?
  • PySpark - get row number for each row in a group
  • how to add Row id in pySpark dataframes

Pyspark convert a standard list to data frame

This solution is also an approach that uses less code, avoids serialization to RDD and is likely easier to understand:

from pyspark.sql.types import IntegerType

# notice the variable name (more below)
mylist = [1, 2, 3, 4]

# notice the parens after the type name
spark.createDataFrame(mylist, IntegerType()).show()

NOTE: About naming your variable list: the term list is a Python builtin function and as such, it is strongly recommended that we avoid using builtin names as the name/label for our variables because we end up overwriting things like the list() function. When prototyping something fast and dirty, a number of folks use something like: mylist.



Related Topics



Leave a reply



Submit