How to Assign and Use Column Headers in Spark

How do I add headers to a PySpark DataFrame?

Like this ... you need to specify schema and .option("header", "false") if your csv does not contain a header row

spark.version
'2.3.2'

! cat sample.csv

1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"

PATH = "sample.csv"

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([\
StructField("col1", IntegerType(), True),\
StructField("col2", FloatType(), True),\
StructField("col3", StringType(), True)])

csvFile = spark.read.format("csv")\
.option("header", "false")\
.schema(schema)\
.load(PATH)

csvFile.show()

+----+----+---------------+
|col1|col2| col3|
+----+----+---------------+
| 1| 2.0| hello|
| 3| 4.0| "there"|
| 5| 6.0| "how are you?"|
+----+----+---------------+

# if you have rdd and want to convert straight to df
rdd = sc.textFile(PATH)

# just showing rows
for i in rdd.collect(): print(i)
1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"

# use Row to construct a schema from rdd
from pyspark.sql import Row

csvDF = rdd\
.map(lambda x: Row(col1 = int(x.split(",")[0]),\
col2 = float(x.split(",")[1]),\
col3 = str(x.split(",")[2]))).toDF()

csvDF.show()
+----+----+---------------+
|col1|col2| col3|
+----+----+---------------+
| 1| 2.0| "hello"|
| 3| 4.0| "there"|
| 5| 6.0| "how are you?"|
+----+----+---------------+

csvDF.printSchema()
root
|-- col1: long (nullable = true)
|-- col2: double (nullable = true)
|-- col3: string (nullable = true)

How to Set Pyspark Dataframe Headers to another Row?

Assuming that there is only one row with id in col1, name in col2 and val in col3, you can use the following logic (commented for clarity and explanation)

#select the row with the header name 
header = df.filter((df['col1'] == 'id') & (df['col2'] == 'name') & (df['col3'] == 'val'))

#selecting the rest of the rows except the first one
restDF = df.subtract(header)

#converting the header row into Row
headerColumn = header.first()

#looping columns for renaming
for column in restDF.columns:
restDF = restDF.withColumnRenamed(column, headerColumn[column])

restDF.show(truncate=False)

this should give you

+---+----+---+
|id |name|val|
+---+----+---+
|1 |a01 |X |
|2 |a02 |Y |
+---+----+---+

But the best option would be read it with header option set to true while reading the dataframe using sqlContext from source

Add column names to data read from csv file without column names

You can use toDF to specify column names when reading the CSV file:

val df = spark.read.option("inferSchema","true").csv("../myfile.csv").toDF(
"ID", "name", "age", "numOfFriends"
)

Or, if you already have the DataFrame created, you can rename its columns as follows:

val newColNames = Seq("ID", "name", "age", "numOfFriends")
val df2 = df.toDF(newColNames: _*)

Take column names from old dataframe in Spark Scala

.toDF accepts (colNames: String*) , We can unnest List[String] as strings with :_*

Example:

val featureCols=Seq("Id","Name","City")
val someDF = Seq((4,"Ahmad","swl").toDF(cols:_*)

Seq(("1","2","3")).toDF(featureCols:_*).show()
//+---+----+----+
//| Id|Name|City|
//+---+----+----+
//| 1| 2| 3|
//+---+----+----+

How to add Kafka headers for a Kafka output in Spark Structured Streaming, making them from Dataframe columns?

I had to create a tuple column using struct() and then array_union() if with preexisting array of headers tuples.

.withColumn(kafkaTimestampColumnName, col("timestamp"))
.withColumn("tupletime", struct(lit(kafkaTimestampColumnName) as "key", (unix_timestamp(col(kafkaTimestampColumnName), "yyyy-MM-dd hh:mm:ss").cast("string")).cast("binary") as "value"))
.withColumn("headers", array_union(col("headers"), array(col("tupletime"))))

please note, that in the tuple the key should be string while the value should be binary.



Related Topics



Leave a reply



Submit