PipelinedRDD' object has no attribute 'toDF' in PySpark
toDF
method is a monkey patch executed inside SparkSession
(SQLContext
constructor in 1.x) constructor so to be able to use it you have to create a SQLContext
(or SparkSession
) first:
# SQLContext or HiveContext in Spark 1.x
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([("a", 1)])
hasattr(rdd, "toDF")
## False
spark = SparkSession(sc)
hasattr(rdd, "toDF")
## True
rdd.toDF().show()
## +---+---+
## | _1| _2|
## +---+---+
## | a| 1|
## +---+---+
Not to mention you need a SQLContext
or SparkSession
to work with DataFrames
in the first place.
Converting rdd to dataframe: AttributeError: 'RDD' object has no attribute 'toDF' using PySpark
Initialize SparkSession
by passing sparkcontext.
Example:
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
conf = SparkConf().setMaster("local").setAppName("Dataframe_examples")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
def parsedLine(line):
fields = line.split(',')
movieId = fields[0]
movieName = fields[1]
genres = fields[2]
return movieId, movieName, genres
movies = sc.textFile("file:///home/ajit/ml-25m/movies.csv")
#or using spark.sparkContext
movies = spark.sparkContext.textFile("file:///home/ajit/ml-25m/movies.csv")
parsedLines = movies.map(parsedLine)
print(parsedLines.count())
dataFrame = parsedLines.toDF(["movieId"])
dataFrame.printSchema()
Unable to use rdd.toDF() but spark.createDataFrame(rdd) Works
toDF
method is executed under SparkSession
in and SQLContex
in 1.x version.
So
spark = SparkSession(sc)
hasattr(rdd, "toDF")
If you are using scala you need to inport import spark.implicits._
where spark is the SparkSession object that you created.
Hope this helps!
Pyspark: AttributeError: 'PipelinedRDD' object has no attribute '_get_object_id'
If you want to replace a certain text pattern in your file, you can try the following without the use of Spark, it probably is gonna be more efficient for a small file like a SQL query.
with open('PATH/file.sql','r') as f:
lines = f.readlines()
phrase = "(Month|| '-' || '5' || '-' || year)"
replace ="('5' || '/' || month || '/' || year)"
new_lines = ''.join([i.replace(phrase,replace) for i in lines])
print(new_lines)
with open('text.sql', 'w') as f:
f.write(new_lines)
The file is read and stored into a list here, the replace function will then be applied to all of the lines of the file and joins it. Finally, write the file you want to save it as.
PipelinedRDD' object has no attribute 'sparkSession' when creating dataframe in pyspark
There is no need to use both SparkContext
and SparkSession
to initialize Spark. Since SparkSession
is the newer, recommended way, use that:
spark = SparkSession\
.builder\
.config("spark.executor.memory","1g")\
.config("spark.cores.max","2")\
.appName("name")\
.getOrCreate()
createDataFrame
can then be accessed by:
prueba_2 = spark.createDataFrame(...)
If you have to use the underlying SparkContext
, you can simply do spark.sparkContext
.
pyspark AttributeError: 'DataFrame' object has no attribute 'toDF'
I figured it out. Looks like it has to do with our spark version. It worked with 1.6
Related Topics
How to Insert Pandas Dataframe via MySQLdb into Database
Getting a MAChine's External Ip Address with Python
Running a Process in Pythonw with Popen Without a Console
Force Numpy Ndarray to Take Ownership of Its Memory in Cython
Which Is More Preferable to Use: Lambda Functions or Nested Functions ('Def')
Sqlalchemy Create_All() Does Not Create Tables
Python - Typeerror: 'Int' Object Is Not Iterable
Plot a Histogram Such That Bar Heights Sum to 1 (Probability)
How to Dynamically Add/Remove Periodic Tasks to Celery (Celerybeat)
How to Parse a Website Using Selenium and Beautifulsoup in Python
Wrapping a C Library in Python: C, Cython or Ctypes