How do I add headers to a PySpark DataFrame?
Like this ... you need to specify schema and .option("header", "false")
if your csv does not contain a header row
spark.version
'2.3.2'
! cat sample.csv
1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"
PATH = "sample.csv"
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType([\
StructField("col1", IntegerType(), True),\
StructField("col2", FloatType(), True),\
StructField("col3", StringType(), True)])
csvFile = spark.read.format("csv")\
.option("header", "false")\
.schema(schema)\
.load(PATH)
csvFile.show()
+----+----+---------------+
|col1|col2| col3|
+----+----+---------------+
| 1| 2.0| hello|
| 3| 4.0| "there"|
| 5| 6.0| "how are you?"|
+----+----+---------------+
# if you have rdd and want to convert straight to df
rdd = sc.textFile(PATH)
# just showing rows
for i in rdd.collect(): print(i)
1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"
# use Row to construct a schema from rdd
from pyspark.sql import Row
csvDF = rdd\
.map(lambda x: Row(col1 = int(x.split(",")[0]),\
col2 = float(x.split(",")[1]),\
col3 = str(x.split(",")[2]))).toDF()
csvDF.show()
+----+----+---------------+
|col1|col2| col3|
+----+----+---------------+
| 1| 2.0| "hello"|
| 3| 4.0| "there"|
| 5| 6.0| "how are you?"|
+----+----+---------------+
csvDF.printSchema()
root
|-- col1: long (nullable = true)
|-- col2: double (nullable = true)
|-- col3: string (nullable = true)
How to Set Pyspark Dataframe Headers to another Row?
Assuming that there is only one row with id
in col1, name
in col2 and val
in col3, you can use the following logic (commented for clarity and explanation)
#select the row with the header name
header = df.filter((df['col1'] == 'id') & (df['col2'] == 'name') & (df['col3'] == 'val'))
#selecting the rest of the rows except the first one
restDF = df.subtract(header)
#converting the header row into Row
headerColumn = header.first()
#looping columns for renaming
for column in restDF.columns:
restDF = restDF.withColumnRenamed(column, headerColumn[column])
restDF.show(truncate=False)
this should give you
+---+----+---+
|id |name|val|
+---+----+---+
|1 |a01 |X |
|2 |a02 |Y |
+---+----+---+
But the best option would be read it with header option set to true while reading the dataframe using sqlContext from source
Add column names to data read from csv file without column names
You can use toDF
to specify column names when reading the CSV file:
val df = spark.read.option("inferSchema","true").csv("../myfile.csv").toDF(
"ID", "name", "age", "numOfFriends"
)
Or, if you already have the DataFrame created, you can rename its columns as follows:
val newColNames = Seq("ID", "name", "age", "numOfFriends")
val df2 = df.toDF(newColNames: _*)
Take column names from old dataframe in Spark Scala
.toDF
accepts (colNames: String*)
, We can unnest List[String]
as strings with :_*
Example:
val featureCols=Seq("Id","Name","City")
val someDF = Seq((4,"Ahmad","swl").toDF(cols:_*)
Seq(("1","2","3")).toDF(featureCols:_*).show()
//+---+----+----+
//| Id|Name|City|
//+---+----+----+
//| 1| 2| 3|
//+---+----+----+
How to add Kafka headers for a Kafka output in Spark Structured Streaming, making them from Dataframe columns?
I had to create a tuple column using struct()
and then array_union()
if with preexisting array of headers tuples.
.withColumn(kafkaTimestampColumnName, col("timestamp"))
.withColumn("tupletime", struct(lit(kafkaTimestampColumnName) as "key", (unix_timestamp(col(kafkaTimestampColumnName), "yyyy-MM-dd hh:mm:ss").cast("string")).cast("binary") as "value"))
.withColumn("headers", array_union(col("headers"), array(col("tupletime"))))
please note, that in the tuple the key should be string
while the value should be binary
.
Related Topics
Webdriverexception: Message: Service Chromedriver Unexpectedly Exited. Status Code Was: 127
Why Is Python No Longer Waiting for Os.System to Finish
Python Divide by Zero Encountered in Log - Logistic Regression
Add Missing Dates to Pandas Dataframe
Convert String from Big-Endian to Little-Endian or Vice Versa in Python
How to Count Values Greater Than the Group Mean in Pandas
Maximum Characters That Can Be Stuffed into Raw_Input() in Python
How to Delete Quotes from Data Read from .Csv File
How to Make a Smiley Face in Turtle
How to Add an Attention Mechanism in Keras
How to Concisely Replace Column Values Given Multiple Conditions
Clearing All Labels from a Tkinter Window
How to Remove Additional Commas in a List in Python
Create a New Dataframe Based on Rows With a Certain Value
Pandas - Replace Outliers With Groupby Mean
Python Key Error=0 - Can't Find Dict Error in Code
How to Determine If Current Time Is Within a Specified Range Using Python'S Datetime Module
Subtract One Dataframe from Another Excluding the First Column Pandas