How to Add a Constant Column in a Spark Dataframe

How to add a constant column in a Spark DataFrame?

Spark 2.2+

Spark 2.2 introduces typedLit to support Seq, Map, and Tuples (SPARK-19254) and following calls should be supported (Scala):

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, 0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3+ (lit), 1.4+ (array, struct), 2.0+ (map):

The second argument for DataFrame.withColumn should be a Column so you have to use a literal:

from pyspark.sql.functions import lit

df.withColumn('new_column', lit(10))

If you need complex columns you can build these using blocks like array:

from pyspark.sql.functions import array, create_map, struct

df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

Exactly the same methods can be used in Scala.

import org.apache.spark.sql.functions.{array, lit, map, struct}

df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

To provide names for structs use either alias on each field:

df.withColumn(
"some_struct",
struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
)

or cast on the whole object

df.withColumn(
"some_struct",
struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
)

It is also possible, although slower, to use an UDF.

Note:

The same constructs can be used to pass constant arguments to UDFs or SQL functions.

Adding constant value column to spark dataframe

you need to import lit

either

from pyspark.sql.functions import *

will make lit available

or something like

import pyspark.sql.functions as sf
wamp = wamp.withColumn('region', sf.lit('NE'))

Adding a constant value to columns in a Spark dataframe

You can use the lit and concat functions:

import org.apache.spark.sql.functions._

// add suffix to all but last column (would work for any number of cols):
val colsWithSuffix = df.columns.dropRight(1).map(c => concat(col(c), lit("del")) as c)
def result = df.select(colsWithSuffix :+ $"age": _*)

result.show()
// +----+---------+---+
// |id |person |age|
// +----+---------+---+
// |1del|naveendel|24 |
// +----+---------+---+

EDIT: to also accommodate null values, you can wrap the column with coalesce before appending the suffix - replace the like calculating colsWithSuffix with:

val colsWithSuffix = df.columns.dropRight(1)
.map(c => concat(coalesce(col(c), lit("")), lit("del")) as c)

How do I add a new column to a Spark DataFrame (using PySpark)?

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+

transforming an existing column:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+

included using join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

or generated with function / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

If you want to add content of an arbitrary RDD as a column you can

  • add row numbers to existing data frame
  • call zipWithIndex on RDD and convert it to data frame
  • join both using index as a join key

How to add a new column with a constant DenseVector to a pyspark dataframe?

You need to pass an ArrayType type column when you call your UDF not DenseVector. And you also need to change the return of add_cons_dense_col function to DenseVector:

import pyspark.sql.functions as F

@F.udf(returnType=VectorUDT())
def add_cons_dense_col(val):
return DenseVector(val)

df.withColumn('ttt', add_cons_dense_col(F.array(F.lit(1.), F.lit(1.)))).show()

#+---+---+---------+
#| _1| _2| ttt|
#+---+---+---------+
#| 1| 2|[1.0,0.0]|
#| 3| 4|[1.0,0.0]|
#| 5| 6|[1.0,0.0]|
#| 7| 8|[1.0,0.0]|
#+---+---+---------+

To create array column from python list:

F.array(*[F.lit(x) for x in [1., 0., 3., 5.]])

PySpark: add a row from one dataframe as a column with constant value to another dataframe

Depends, if its only one row, may as well just cross join. Please remember, this can be quite expensive if multiple rows are involved

df2.crossJoin(df1.select("values")).show()

pyspark equivalent of adding a constant array to a dataframe as column

You can import array from functions module

>>> from pyspark.sql.types import *
>>> from pyspark.sql.functions import array

>>> tag=array(lit("oracle"),lit("java")
>>> df2.withColumn("tags",tag).show()

Tested below

>>> from pyspark.sql.functions import array

>>> tag=array(lit("oracle"),lit("java"))
>>>
>>> ranked.withColumn("tag",tag).show()
+------+--------------+----------+-----+----+----+--------------+
|gender| ethinicity|first_name|count|rank|year| tag|
+------+--------------+----------+-----+----+----+--------------+
| MALE| HISPANIC| JAYDEN| 364| 1|2012|[oracle, java]|
| MALE|WHITE NON HISP| JOSEPH| 300| 2|2012|[oracle, java]|
| MALE|WHITE NON HISP| JOSEPH| 300| 2|2012|[oracle, java]|
| MALE| HISPANIC| JACOB| 293| 4|2012|[oracle, java]|
| MALE| HISPANIC| JACOB| 293| 4|2012|[oracle, java]|
| MALE|WHITE NON HISP| DAVID| 289| 6|2012|[oracle, java]|
| MALE|WHITE NON HISP| DAVID| 289| 6|2012|[oracle, java]|
| MALE| HISPANIC| MATTHEW| 279| 8|2012|[oracle, java]|
| MALE| HISPANIC| MATTHEW| 279| 8|2012|[oracle, java]|
| MALE| HISPANIC| ETHAN| 254| 10|2012|[oracle, java]|
| MALE| HISPANIC| ETHAN| 254| 10|2012|[oracle, java]|
| MALE|WHITE NON HISP| MICHAEL| 245| 12|2012|[oracle, java]|
| MALE|WHITE NON HISP| MICHAEL| 245| 12|2012|[oracle, java]|
| MALE|WHITE NON HISP| JACOB| 242| 14|2012|[oracle, java]|
| MALE|WHITE NON HISP| JACOB| 242| 14|2012|[oracle, java]|
| MALE|WHITE NON HISP| MOSHE| 238| 16|2012|[oracle, java]|
| MALE|WHITE NON HISP| MOSHE| 238| 16|2012|[oracle, java]|
| MALE| HISPANIC| ANGEL| 236| 18|2012|[oracle, java]|
| MALE| HISPANIC| AIDEN| 235| 19|2012|[oracle, java]|
| MALE|WHITE NON HISP| DANIEL| 232| 20|2012|[oracle, java]|
+------+--------------+----------+-----+----+----+--------------+
only showing top 20 rows

Adding new column to spark dataframe by getting data lookup from other dataframe

Well, what you're looking for is quite straightforward, just multiple steps and could be a bit confusing if you don't write it properly. You might find this answer is basically as similar as the other, but with better structure. I added comments on each step, but feel free to run each of them to follow the logic.

from pyspark.sql import functions as F

(df
.withColumn('error_column', F.explode(F.split('error_column', ','))) # breakdown multiple errors to different rows
.join(look_up_df.withColumnRenamed('column_nm', 'error_column'), on=['error_column'], how='inner') # rename column so we can shorten the `on` conditions
.withColumn('error_desc', F.concat( # concat as your requirement
F.col('error_column'),
F.lit(' expected '),
F.col('clmn1'),
F.lit(' and comment is '),
F.col('comment'),
))
.groupBy('col1', 'col2') # as we broken it down, it's time to join them back together
.agg(
F.concat_ws(', ', F.collect_list('error_column')).alias('error_column'), # concat errors together with comma separated
F.concat_ws(', ', F.collect_list('error_desc')).alias('error_desc'), # concat descriotions together with comma separated
)
.show(10, False)
)

+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|col1|col2|error_column |error_desc |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+
|1 |4 |date_from, date_to|date_from expected DD-MM-YY and comment is text msg1, date_to expected DD-MM-YY and comment is test msg2|
|1 |8 |emp_name |emp_name expected VARCHAR(100) and comment is test msg3 |
+----+----+------------------+---------------------------------------------------------------------------------------------------------+



Related Topics



Leave a reply



Submit