Spark Rdd to Dataframe Python

Spark RDD to DataFrame python

See,

There are two ways to convert an RDD to DF in Spark.

toDF() and createDataFrame(rdd, schema)

I will show you how you can do that dynamically.

toDF()

The toDF() command gives you the way to convert an RDD[Row] to a Dataframe. The point is, the object Row() can receive a **kwargs argument. So, there is an easy way to do that.

from pyspark.sql.types import Row

#here you are going to create a function
def f(x):
d = {}
for i in range(len(x)):
d[str(i)] = x[i]
return d

#Now populate that
df = rdd.map(lambda x: Row(**f(x))).toDF()

This way you are going to be able to create a dataframe dynamically.

createDataFrame(rdd, schema)

Other way to do that is creating a dynamic schema. How?

This way:

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])

df = sqlContext.createDataFrame(rdd, schema)

This second way is cleaner to do that...

So this is how you can create dataframes dynamically.

How to convert rdd object to dataframe in spark

SparkSession has a number of createDataFrame methods that create a DataFrame given an RDD. I imagine one of these will work for your context.

For example:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Creates a DataFrame from an RDD containing Rows using the given
schema.

Convert RDD into Dataframe in pyspark

RDD.map takes an unary function:

rdd.map(lambda x: (x[1], x[0][0] , x[0][1])).toDF(["Index", "Name" , "Number"])

so you cannot pass binary one.

If you want to split array:

rdd.map(lambda x: (x[1], x[0][0] , x[0][1].split(","))).toDF(["Index", "Name" , "Number"])

How to convert RDD to Dataframe Spark 2.4.5 Python

Finally got success with following code.

After 8days of findings and help of a great guy at stack overflow("https://stackoverflow.com/users/2451763/sparker0i") together could came up with the following code which did work with Spark 2.4.5 in databricks.

Aggregate and Convert

from pyspark.sql.functions import *
dataF = data.select("OrderMonthYear", date_format("OrderMonthYear", 'yyyy-MM-dd').alias("dt_format"), "SaleAmount").groupBy("dt_format").sum().orderBy("dt_format").toDF("dt_format", "SaleAmount")

dataF.show()

results = tata.rdd.map(lambda r: (int(r.dt_format.replace('-','')), r.SaleAmount))
df = spark.createDataFrame(results,("dt_format", "SaleAmount"))
display(df)


Convert DataFrame to Features and Labels

#convenience for specifying schema 

from pyspark.mllib.regression import LabeledPoint

meta = df.select("dt_format", "SaleAmount").toDF("dt_format","SaleAmount") meta.show()

rdd1 = meta.rdd.map(lambda r: LabeledPoint(r[1], [r[0]]))

rddtodf=spark.createDataFrame(rdd1,("dt_format","SaleAmount")) display(rddtodf)

Hope this will help.

Fail to convert an RDD to dataframe

You have multiple problems with this code.

The first problem, which you have probably encountered here, is missing import of Row class, hence the method toDF() fails to execute and create a logical plan for you dataframe.

The second problem occurs in the definition of col1 column. If you try to execute int(float('nan')) it will result in a ValueError and therefore crashes the execution later on when you call an action on the dataframe.

You can solve both problems for example this way:

items = [(1,12),(1,float('Nan')),(1,14),(1,10),(2,22),(2,20),(2,float('Nan')),
(3,300),(3,float('Nan'))]

sc = spark.sparkContext
rdd = sc.parallelize(items)

df = rdd.toDF(["id", "col1"])

If you wish to retype the columns, I'd suggest to use the cast method on the specific column you want to retype. It's a bit safer, faster and more stable way to change column types in Spark dataframe rather than forcing a Python type on each row.

How to convert a rdd of pandas DataFrame to Spark DataFrame

import pandas as pd

def create_df(x):
df=pd.DataFrame(np.random.rand(5, 3)).assign(col=x)
return df.values.tolist()

sc.parallelize(range(5)).flatMap(create_df).toDF().\
.write.format("parquet").save("parquet_file")

Spark RDD and Dataframe transformation optimisation

Whenever the action is called, the optimized dag gets executed and the memory is used as per the plan.
You can compare the execution plans to understand:

df.explain(true)
df_new.explain(true)

Creating extra variable in between to hold the transformations does not impact the memory utilization. Memory requirements will depend on data size, partition size, shuffles etc.

Convert RDD to Dataframe in Spark Streaming Python

Since you use RDD[str] you should either provide a matching type. For an atomic value it is either a corresponding AtomicType

from pyspark.sql.types import StringType, StructField, StructType

rdd = sc.parallelize(["hello world"])
spark.createDataFrame(rdd, StringType())

or its string description:

spark.createDataFrame(rdd, "string")

If you want to use StructType convert data to tuples first:

schema = StructType([StructField("text", StringType(), True)])

spark.createDataFrame(rdd.map(lambda x: (x, )), schema)

Of course if you're going to just convert each batch to DataFrame it makes much more sense to use Structured Streaming all the way:

lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())


Related Topics



Leave a reply



Submit