PySpark DataFrames - way to enumerate without converting to Pandas?
It doesn't work because:
- the second argument for
withColumn
should be aColumn
not a collection.np.array
won't work here - when you pass
"index in indexes"
as a SQL expression towhere
indexes
is out of scope and it is not resolved as a valid identifier
You can add row numbers using respective window function and query using Column.isin
method or properly formated query string:
from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window
w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))
# Using DSL
indexed.where(col("index").isin(set(indexes)))
# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
It looks like window functions called without PARTITION BY
clause move all data to the single partition so above may be not the best solution after all.
Not really. Spark DataFrames don't support random row access.Any faster and simpler way to deal with it?
PairedRDD
can be accessed using lookup
method which is relatively fast if data is partitioned using HashPartitioner
. There is also indexed-rdd project which supports efficient lookups.
Edit:
Independent of PySpark version you can try something like this:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
row = Row("char")
row_with_index = Row("char", "index")
df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)
## +----+
## |char|
## +----+
## | a|
## | b|
## | c|
## | d|
## | e|
## +----+
## only showing top 5 rows
# This part is not tested but should work and save some work later
schema = StructType(
df.schema.fields[:] + [StructField("index", LongType(), False)])
indexed = (df.rdd # Extract rdd
.zipWithIndex() # Add index
.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
.toDF(schema)) # It will work without schema but will be more expensive
# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))
Extract values from list within a spark dataframe without convert to pandas
you can for example crate a new column like this by picking an element from the list on another column by index.
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
df = DataFrame()
df = df.withColumn("selected_item", F.col("sentence").getItem(0))```
Converting dataframe to dictionary in pyspark without using pandas
df2
is the dataframe from the previous post. You can do a pivot first, and then convert to dictionary as described in your linked post.
import pyspark.sql.functions as F
df3 = df2.groupBy('age').pivot('siblings').agg(F.first('count'))
list_persons = [row.asDict() for row in df3.collect()]
dict_persons = {person['age']: person for person in list_persons}
{15: {'age': 15, '0': 1.0, '1': None, '3': None}, 10: {'age': 10, '0': None, '1': None, '3': 1.0}, 14: {'age': 14, '0': None, '1': 1.0, '3': None}}
Or another way:df4 = df3.fillna(float('nan')).groupBy().pivot('age').agg(F.first(F.struct(*df3.columns[1:])))
result_dict = eval(df4.select(F.to_json(F.struct(*df4.columns))).head()[0])
{'10': {'0': 'NaN', '1': 'NaN', '3': 1.0}, '14': {'0': 'NaN', '1': 1.0, '3': 'NaN'}, '15': {'0': 1.0, '1': 'NaN', '3': 'NaN'}}
Is there a way to slice dataframe based on index in pyspark?
Short AnswerIf you already have an index column (suppose it was called 'id'
) you can filter using pyspark.sql.Column.between
:
from pyspark.sql.functions import col
df.where(col("id").between(5, 10))
If you don't already have an index column, you can add one yourself and then use the code above. You should have some ordering built in to your data based on some other columns (orderBy("someColumn")
).Full Explanation
No it is not easily possible to slice a Spark DataFrame by index, unless the index is already present as a column.
Spark DataFrames are inherently unordered and do not support random access. (There is no concept of a built-in index as there is in pandas). Each row is treated as an independent collection of structured data, and that is what allows for distributed parallel processing. Thus, any executor can take any chunk of the data and process it without regard for the order of the rows.
Now obviously it is possible to perform operations that do involve ordering (lead
, lag
, etc), but these will be slower because it requires spark to shuffle data between the executors. (The shuffling of data is typically one of the slowest components of a spark job.)
Related/Futher Reading
- PySpark DataFrames - way to enumerate without converting to Pandas?
- PySpark - get row number for each row in a group
- how to add Row id in pySpark dataframes
Pyspark convert a standard list to data frame
This solution is also an approach that uses less code, avoids serialization to RDD and is likely easier to understand:
from pyspark.sql.types import IntegerType
# notice the variable name (more below)
mylist = [1, 2, 3, 4]
# notice the parens after the type name
spark.createDataFrame(mylist, IntegerType()).show()
NOTE: About naming your variable list
: the term list
is a Python builtin function and as such, it is strongly recommended that we avoid using builtin names as the name/label for our variables because we end up overwriting things like the list()
function. When prototyping something fast and dirty, a number of folks use something like: mylist
.
Related Topics
Selenium Using Python: Enter/Provide Http Proxy Password for Firefox
What Do Backticks Mean to the Python Interpreter? Example: 'Num'
What Is the Default _Hash_ in Python
Subclassing Python Dictionary to Override _Setitem_
Append Dataframe to Excel with Pandas
Why Does Python Use 'Magic Methods'
How to Get Tweets Older Than a Week (Using Tweepy or Other Python Libraries)
How to Assign the Same Value to Multiple Keys in a Dict Object at Once
How to Wrap a String in a File in Python
When to Use Get, Get_Queryset, Get_Context_Data in Django
Statistics: Combinations in Python
Django Post_Save() Signal Implementation
Looping from 1 to Infinity in Python
Generate Rfc 3339 Timestamp in Python
Slicing of a Numpy 2D Array, or How to Extract an Mxm Submatrix from an Nxn Array (N>M)