How to Add a New Column to a Spark Dataframe (Using Pyspark)

How do I add a new column to a Spark DataFrame (using PySpark)?

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+

transforming an existing column:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+

included using join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

or generated with function / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

If you want to add content of an arbitrary RDD as a column you can

  • add row numbers to existing data frame
  • call zipWithIndex on RDD and convert it to data frame
  • join both using index as a join key

Adding new column to spark dataframe by getting data lookup from other dataframe

Well, what you're looking for is quite straightforward, just multiple steps and could be a bit confusing if you don't write it properly. You might find this answer is basically as similar as the other, but with better structure. I added comments on each step, but feel free to run each of them to follow the logic.

from pyspark.sql import functions as F

(df
.withColumn('error_column', F.explode(F.split('error_column', ','))) # breakdown multiple errors to different rows
.join(look_up_df.withColumnRenamed('column_nm', 'error_column'), on=['error_column'], how='inner') # rename column so we can shorten the `on` conditions
.withColumn('error_desc', F.concat( # concat as your requirement
F.col('error_column'),
F.lit(' expected '),
F.col('clmn1'),
F.lit(' and comment is '),
F.col('comment'),
))
.groupBy('col1', 'col2') # as we broken it down, it's time to join them back together
.agg(
F.concat_ws(', ', F.collect_list('error_column')).alias('error_column'), # concat errors together with comma separated
F.concat_ws(', ', F.collect_list('error_desc')).alias('error_desc'), # concat descriotions together with comma separated
)
.show(10, False)
)

+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|col1|col2|error_column |error_desc |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|1 |4 |date_from, date_to|date_from expected DD-MM-YY and comment is text msg1, date_to expected DD-MM-YY and comment is test msg2|
|1 |8 |emp_name |emp_name expected VARCHAR(100) and comment is test msg3 |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+

pyspark- how to add a column to spark dataframe from a list

You could try something like:

import pyspark.sql.functions as F

list_example = [1,3,5,7,8]
new_df = df.withColumn("new_column", F.array( [F.lit(x) for x in list_example] ))
new_df.show()

Adding a new column to a spark dataframe

if you want to add new incremental column to DF, you could do in following ways.

df.show()
+-------+
| name|
+-------+
|gaurnag|
+-------+
from pyspark.sql.functions import monotonically_increasing_id
new_df = df.withColumn("id", monotonically_increasing_id())
new_df.show()
+-------+---+
| name| id|
+-------+---+
|gaurnag| 0|
+-------+---+

Add new column based on existing column with concat values Spark dataframe

Use row_number window function with monotonically_increasing_id()

from pyspark.sql import *
from pyspark.sql.functions import *
w = Window.orderBy(monotonically_increasing_id())
df.withColumn("new_col",concat(split(col("my_string")," ")[0], lpad(row_number().over(w),2,"0"))).show()

#+---------+-------+
#|my_string|new_col|
#+---------+-------+
#|2020 test| 202001|
#|2020 prod| 202002|
#| 2020 dev| 202003|
#+---------+-------+

UPDATE:

Use when+otherwise statement.

df.withColumn("dyn_col",when(lower(split(col("my_string")," ")[1]) =="prod","kk").\
when(lower(split(col("my_string")," ")[1]) =="dev","ff").\
when(lower(split(col("my_string")," ")[1]) =="test","01").\
otherwise("null")).\
withColumn("new_col",concat(split(col("my_string")," ")[0], col("dyn_col"))).\
drop("dyn_col").\
show()
#+---------+-------+
#|my_string|new_col|
#+---------+-------+
#|2020 test| 202001|
#|2020 prod| 2020kk|
#| 2020 dev| 2020ff|
#+---------+-------+

In Scala:

df.withColumn("dyn_col",when(lower(split(col("my_string")," ")(1)) ==="prod","kk").
when(lower(split(col("my_string")," ")(1)) ==="dev","ff").
when(lower(split(col("my_string")," ")(1)) ==="test","01").
otherwise("null")).
withColumn("new_col",concat(split(col("my_string")," ")(0), col("dyn_col"))).
drop("dyn_col").
show()

//+---------+-------+
//|my_string|new_col|
//+---------+-------+
//|2020 test| 202001|
//|2020 prod| 2020kk|
//| 2020 dev| 2020ff|
//+---------+-------+

How to create new string column in PySpark DataFrame based on values of other columns?

You can use concat function or format_string like this:

from pyspark.sql import functions as F

df = df.withColumn(
"New",
F.format_string("Hey there %s %s!", "Name", "Surname")
)

df.show(truncate=False)
# +---+----+-------+-----------------------+
# |Id |Name|Surname|New |
# +---+----+-------+-----------------------+
# |1 |John|Johnson|Hey there John Johnson!|
# |2 |Anna|Maria |Hey there Anna Maria! |
# +---+----+-------+-----------------------+

If you prefer using concat:

F.concat(F.lit("Hey there "), F.col("Name"), F.lit(" "), F.col("Surname"), F.lit("!"))


Related Topics



Leave a reply



Submit