Spark RDD to DataFrame python
See,
There are two ways to convert an RDD to DF in Spark.
toDF()
and createDataFrame(rdd, schema)
I will show you how you can do that dynamically.
toDF()
The toDF()
command gives you the way to convert an RDD[Row]
to a Dataframe. The point is, the object Row()
can receive a **kwargs
argument. So, there is an easy way to do that.
from pyspark.sql.types import Row
#here you are going to create a function
def f(x):
d = {}
for i in range(len(x)):
d[str(i)] = x[i]
return d
#Now populate that
df = rdd.map(lambda x: Row(**f(x))).toDF()
This way you are going to be able to create a dataframe dynamically.createDataFrame(rdd, schema)
Other way to do that is creating a dynamic schema. How?This way:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])
df = sqlContext.createDataFrame(rdd, schema)
This second way is cleaner to do that...So this is how you can create dataframes dynamically.
How to convert rdd object to dataframe in spark
SparkSession
has a number of createDataFrame
methods that create a DataFrame
given an RDD
. I imagine one of these will work for your context.
For example:
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Creates a DataFrame from an RDD containing Rows using the given
schema.
Convert RDD into Dataframe in pyspark
RDD.map
takes an unary function:
rdd.map(lambda x: (x[1], x[0][0] , x[0][1])).toDF(["Index", "Name" , "Number"])
so you cannot pass binary one.If you want to split array:
rdd.map(lambda x: (x[1], x[0][0] , x[0][1].split(","))).toDF(["Index", "Name" , "Number"])
How to convert RDD to Dataframe Spark 2.4.5 Python
Finally got success with following code.
After 8days of findings and help of a great guy at stack overflow("https://stackoverflow.com/users/2451763/sparker0i") together could came up with the following code which did work with Spark 2.4.5 in databricks.
Aggregate and Convert
from pyspark.sql.functions import *
dataF = data.select("OrderMonthYear", date_format("OrderMonthYear", 'yyyy-MM-dd').alias("dt_format"), "SaleAmount").groupBy("dt_format").sum().orderBy("dt_format").toDF("dt_format", "SaleAmount")
dataF.show()
results = tata.rdd.map(lambda r: (int(r.dt_format.replace('-','')), r.SaleAmount))
df = spark.createDataFrame(results,("dt_format", "SaleAmount"))
display(df)
Convert DataFrame to Features and Labels
#convenience for specifying schema
from pyspark.mllib.regression import LabeledPoint
meta = df.select("dt_format", "SaleAmount").toDF("dt_format","SaleAmount") meta.show()
rdd1 = meta.rdd.map(lambda r: LabeledPoint(r[1], [r[0]]))
rddtodf=spark.createDataFrame(rdd1,("dt_format","SaleAmount")) display(rddtodf)
Hope this will help. Fail to convert an RDD to dataframe
You have multiple problems with this code.
The first problem, which you have probably encountered here, is missing import of Row
class, hence the method toDF()
fails to execute and create a logical plan for you dataframe.
The second problem occurs in the definition of col1
column. If you try to execute int(float('nan'))
it will result in a ValueError
and therefore crashes the execution later on when you call an action on the dataframe.
You can solve both problems for example this way:
items = [(1,12),(1,float('Nan')),(1,14),(1,10),(2,22),(2,20),(2,float('Nan')),
(3,300),(3,float('Nan'))]
sc = spark.sparkContext
rdd = sc.parallelize(items)
df = rdd.toDF(["id", "col1"])
If you wish to retype the columns, I'd suggest to use the cast
method on the specific column you want to retype. It's a bit safer, faster and more stable way to change column types in Spark dataframe rather than forcing a Python type on each row. How to convert a rdd of pandas DataFrame to Spark DataFrame
import pandas as pd
def create_df(x):
df=pd.DataFrame(np.random.rand(5, 3)).assign(col=x)
return df.values.tolist()
sc.parallelize(range(5)).flatMap(create_df).toDF().\
.write.format("parquet").save("parquet_file")
Spark RDD and Dataframe transformation optimisation
Whenever the action is called, the optimized dag gets executed and the memory is used as per the plan.
You can compare the execution plans to understand:
df.explain(true)
df_new.explain(true)
Creating extra variable in between to hold the transformations does not impact the memory utilization. Memory requirements will depend on data size, partition size, shuffles etc. Convert RDD to Dataframe in Spark Streaming Python
Since you use RDD[str]
you should either provide a matching type. For an atomic value it is either a corresponding AtomicType
from pyspark.sql.types import StringType, StructField, StructType
rdd = sc.parallelize(["hello world"])
spark.createDataFrame(rdd, StringType())
or its string description:spark.createDataFrame(rdd, "string")
If you want to use StructType
convert data to tuples
first:schema = StructType([StructField("text", StringType(), True)])
spark.createDataFrame(rdd.map(lambda x: (x, )), schema)
Of course if you're going to just convert each batch to DataFrame
it makes much more sense to use Structured Streaming all the way:lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
Related Topics
Why Does the Floating-Point Value of 4*0.1 Look Nice in Python 3 But 3*0.1 Doesn'T
Slicing of a Numpy 2D Array, or How to Extract an Mxm Submatrix from an Nxn Array (N>M)
Python, Default Keyword Arguments After Variable Length Positional Arguments
Is There a Function to Determine Which Quarter of the Year a Date Is In
How May I Override the Compiler (Gcc) Flags That Setup.Py Uses by Default
How to Compare Times of the Day
Plotting Multiple Lines, in Different Colors, with Pandas Dataframe
Django Unique Together (With Foreign Keys)
Merging Dictionary Value Lists in Python
Runtimeerror: This Event Loop Is Already Running in Python
Multiprocessing:Use Tqdm to Display a Progress Bar
Python and Operator on Two Boolean Lists - How
Pairwise Crossproduct in Python
Regex for Existence of Some Words Whose Order Doesn't Matter
How to Save Final Model Using Keras