How do I add a new column to a Spark DataFrame (using PySpark)?
You cannot add an arbitrary column to a DataFrame
in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
transforming an existing column:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
included using join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
or generated with function / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Performance-wise, built-in functions (pyspark.sql.functions
), which map to Catalyst expression, are usually preferred over Python user defined functions.
If you want to add content of an arbitrary RDD as a column you can
- add row numbers to existing data frame
- call
zipWithIndex
on RDD and convert it to data frame - join both using index as a join key
Adding new column to spark dataframe by getting data lookup from other dataframe
Well, what you're looking for is quite straightforward, just multiple steps and could be a bit confusing if you don't write it properly. You might find this answer is basically as similar as the other, but with better structure. I added comments on each step, but feel free to run each of them to follow the logic.
from pyspark.sql import functions as F
(df
.withColumn('error_column', F.explode(F.split('error_column', ','))) # breakdown multiple errors to different rows
.join(look_up_df.withColumnRenamed('column_nm', 'error_column'), on=['error_column'], how='inner') # rename column so we can shorten the `on` conditions
.withColumn('error_desc', F.concat( # concat as your requirement
F.col('error_column'),
F.lit(' expected '),
F.col('clmn1'),
F.lit(' and comment is '),
F.col('comment'),
))
.groupBy('col1', 'col2') # as we broken it down, it's time to join them back together
.agg(
F.concat_ws(', ', F.collect_list('error_column')).alias('error_column'), # concat errors together with comma separated
F.concat_ws(', ', F.collect_list('error_desc')).alias('error_desc'), # concat descriotions together with comma separated
)
.show(10, False)
)
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|col1|col2|error_column |error_desc |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|1 |4 |date_from, date_to|date_from expected DD-MM-YY and comment is text msg1, date_to expected DD-MM-YY and comment is test msg2|
|1 |8 |emp_name |emp_name expected VARCHAR(100) and comment is test msg3 |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
pyspark- how to add a column to spark dataframe from a list
You could try something like:
import pyspark.sql.functions as F
list_example = [1,3,5,7,8]
new_df = df.withColumn("new_column", F.array( [F.lit(x) for x in list_example] ))
new_df.show()
Adding a new column to a spark dataframe
if you want to add new incremental column to DF, you could do in following ways.
df.show()
+-------+
| name|
+-------+
|gaurnag|
+-------+
from pyspark.sql.functions import monotonically_increasing_id
new_df = df.withColumn("id", monotonically_increasing_id())
new_df.show()
+-------+---+
| name| id|
+-------+---+
|gaurnag| 0|
+-------+---+
Add new column based on existing column with concat values Spark dataframe
Use row_number
window function with monotonically_increasing_id()
from pyspark.sql import *
from pyspark.sql.functions import *
w = Window.orderBy(monotonically_increasing_id())
df.withColumn("new_col",concat(split(col("my_string")," ")[0], lpad(row_number().over(w),2,"0"))).show()
#+---------+-------+
#|my_string|new_col|
#+---------+-------+
#|2020 test| 202001|
#|2020 prod| 202002|
#| 2020 dev| 202003|
#+---------+-------+
UPDATE:
Use when+otherwise
statement.
df.withColumn("dyn_col",when(lower(split(col("my_string")," ")[1]) =="prod","kk").\
when(lower(split(col("my_string")," ")[1]) =="dev","ff").\
when(lower(split(col("my_string")," ")[1]) =="test","01").\
otherwise("null")).\
withColumn("new_col",concat(split(col("my_string")," ")[0], col("dyn_col"))).\
drop("dyn_col").\
show()
#+---------+-------+
#|my_string|new_col|
#+---------+-------+
#|2020 test| 202001|
#|2020 prod| 2020kk|
#| 2020 dev| 2020ff|
#+---------+-------+
In Scala:
df.withColumn("dyn_col",when(lower(split(col("my_string")," ")(1)) ==="prod","kk").
when(lower(split(col("my_string")," ")(1)) ==="dev","ff").
when(lower(split(col("my_string")," ")(1)) ==="test","01").
otherwise("null")).
withColumn("new_col",concat(split(col("my_string")," ")(0), col("dyn_col"))).
drop("dyn_col").
show()
//+---------+-------+
//|my_string|new_col|
//+---------+-------+
//|2020 test| 202001|
//|2020 prod| 2020kk|
//| 2020 dev| 2020ff|
//+---------+-------+
How to create new string column in PySpark DataFrame based on values of other columns?
You can use concat
function or format_string
like this:
from pyspark.sql import functions as F
df = df.withColumn(
"New",
F.format_string("Hey there %s %s!", "Name", "Surname")
)
df.show(truncate=False)
# +---+----+-------+-----------------------+
# |Id |Name|Surname|New |
# +---+----+-------+-----------------------+
# |1 |John|Johnson|Hey there John Johnson!|
# |2 |Anna|Maria |Hey there Anna Maria! |
# +---+----+-------+-----------------------+
If you prefer using concat:
F.concat(F.lit("Hey there "), F.col("Name"), F.lit(" "), F.col("Surname"), F.lit("!"))
Related Topics
Sqlalchemy Create_All() Does Not Create Tables
Python - Typeerror: 'Int' Object Is Not Iterable
Plot a Histogram Such That Bar Heights Sum to 1 (Probability)
How to Dynamically Add/Remove Periodic Tasks to Celery (Celerybeat)
How to Parse a Website Using Selenium and Beautifulsoup in Python
A Fast Way to Find the Largest N Elements in an Numpy Array
Python Accessing Nested JSON Data
How to Stop Flask from Initialising Twice in Debug Mode
Why 'Torch.Cuda.Is_Available()' Returns False Even After Installing Pytorch with Cuda
Django Reverse Lookup of Foreign Keys
Pandas Pivot Tables Row Subtotals
How to Calculate the Sum of All Columns of a 2D Numpy Array (Efficiently)
Sorting a Dictionary by Value Then Key