Spark Add New Column With Value Form Previous Some Columns

Spark add new column to dataframe with value from previous row

You can use lag window function as follows

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+

but there some important issues:

  1. if you need a global operation (not partitioned by some other column / columns) it is extremely inefficient.
  2. you need a natural way to order your data.

While the second issue is almost never a problem the first one can be a deal-breaker. If this is the case you should simply convert your DataFrame to RDD and compute lag manually. See for example:

  • How to transform data with sliding window over time series data in Pyspark
  • Apache Spark Moving Average (written in Scala, but can be adjusted for PySpark. Be sure to read the comments first).

Other useful links:

  • https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/05_moving-average-imputation.ipynb
  • Spark Window Functions - rangeBetween dates

Add new column to dataframe based on previous values and condition

val amount = ss.sparkContext.parallelize(Seq(("B","2014-12-31", 3456))).toDF("level1", "dateY", "amount")

val yearStr = udf((date:String) => {(date.substring(0,4).toInt - 1) +"-12-31" })

val df3 = amount.withColumn( "p", yearStr($"dateY"))

df3.show()

df3.createOrReplaceTempView("dfView")

val df4 = df3.filter( s => s.getString(1).contains("12-31")).select( $"dateY".as("p"), $"level1",$"amount".as("am"))

df4.show
df3.join( df4, Seq("p", "level1"), "left_outer").orderBy("level1", "amount").drop($"p").show()

Adding a new column in Data Frame derived from other columns (Spark)

One way to achieve that is to use withColumn method:

old_df = sqlContext.createDataFrame(sc.parallelize(
[(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2'))

new_df = old_df.withColumn('col_n', old_df.col_1 - old_df.col_2)

Alternatively you can use SQL on a registered table:

old_df.registerTempTable('old_df')
new_df = sqlContext.sql('SELECT *, col_1 - col_2 AS col_n FROM old_df')

Add new column with maximum value of another column in pyspark dataframe

I don't think we can use aggregate functions in withColumn, But here are the workaround for this case.

1.Using crossJoin:

from pyspark.sql.functions import *
df.show()
#+---+----+
#| id|name|
#+---+----+
#| 1| a|
#| 2| b|
#| 3| c|
#+---+----+
df1=df.agg(max('id'))
spark.sql("set spark.sql.crossJoin.enabled=true")
#cross join
df.join(df1)
#or
df.crossJoin(df1).show()
+---+----+-------+
#| id|name|max(id)|
#+---+----+-------+
#| 1| a| 3|
#| 2| b| 3|
#| 3| c| 3|
#+---+----+-------+

2. Using Window function:

from pyspark.sql import *
import sys
w=Window.orderBy(monotonically_increasing_id()).rowsBetween(-sys.maxsize,sys.maxsize)
df.withColumn("max",max(col("id")).over(w)).show()
#+---+----+---+
#| id|name|max|
#+---+----+---+
#| 1| a| 3|
#| 2| b| 3|
#| 3| c| 3|
#+---+----+---+

3. Using variable substitution:

max_value=df.agg(max("id")).collect()[0][0]

df.withColumn("max",lit(max_value)).show()

#or
max_value=lit(df.agg(max("id")).collect()[0][0])
type(max_value)
#<class 'pyspark.sql.column.Column'>
df.withColumn("max",max_value).show()
#+---+----+---+
#| id|name|max|
#+---+----+---+
#| 1| a| 3|
#| 2| b| 3|
#| 3| c| 3|
#+---+----+---+

Using Spark-sql:

df.createOrReplaceTempView("tmp")
spark.sql("select * from tmp cross join (select max(id) max_val from tmp) t1").show()

spark.sql("select *,max(id) over(order by id rows between unbounded preceding and unbounded following) as max_val from tmp").show()

max_value=df.agg(max(col("id"))).collect()[0][0]
spark.sql("select *,{0} as max_val from tmp".format(max_value)).show()
#+---+----+-------+
#| id|name|max_val|
#+---+----+-------+
#| 1| a| 3|
#| 2| b| 3|
#| 3| c| 3|
#+---+----+-------+

SPARK 3 - Populate value with value from previous rows (lookup)

Due to it's distributed nature, Spark can't allow for if allow populated in previous call then use it otherwise call created value. There are two possible options.

  1. Since you are applying an inner join and players df has the list of all distinct players, you can add the current_team column to this df before applying a join. If the players df is cached before joining then it's possible that the UDF is invoked only once for each player. See discussion here for why UDF can be called multiple time for each record.
  2. You can memoize getCurrentTeam

Working Example - Prepopulate current_team

from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

events_data = [(1, 1, 1, 10), (1, 2, 1, 20, ), (1, 3, 1, 30, ), (2, 3, 1, 30, ), (2, 1, 1, 10), (2, 2, 1, 20, ), ]
players_data = [(1, "Player1", "Nat", ), (2, "Player2", "Nat", ), (3, "Player3", "Nat", ), ]

events = spark.createDataFrame(events_data, ("event_id", "player_id", "match_id", "impact_score", ), ).repartition(3)
players = spark.createDataFrame(players_data, ("player_id", "player_name", "nationality", ), ).repartition(3)


@udf(StringType())
def getCurrentTeam(player_id):
return f"player_{player_id}_team"

players_with_current_team = players.withColumn("current_team", getCurrentTeam(F.col("player_id"))).cache()

events.join(players_with_current_team, ["player_id"]).show()

Output

+---------+--------+--------+------------+-----------+-----------+-------------+
|player_id|event_id|match_id|impact_score|player_name|nationality| current_team|
+---------+--------+--------+------------+-----------+-----------+-------------+
| 2| 2| 1| 20| Player2| Nat|player_2_team|
| 2| 1| 1| 20| Player2| Nat|player_2_team|
| 3| 2| 1| 30| Player3| Nat|player_3_team|
| 3| 1| 1| 30| Player3| Nat|player_3_team|
| 1| 2| 1| 10| Player1| Nat|player_1_team|
| 1| 1| 1| 10| Player1| Nat|player_1_team|
+---------+--------+--------+------------+-----------+-----------+-------------+

Working Example - Memoization

I have used a python dict for mimicing caching and using an accumulator to count number of mimicked network calls made.

from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import time

events_data = [(1, 1, 1, 10), (1, 2, 1, 20, ), (1, 3, 1, 30, ), (2, 3, 1, 30, ), (2, 1, 1, 10), (2, 2, 1, 20, ), ]
players_data = [(1, "Player1", "Nat", ), (2, "Player2", "Nat", ), (3, "Player3", "Nat", ), ]

events = spark.createDataFrame(events_data, ("event_id", "player_id", "match_id", "impact_score", ), ).repartition(3)
players = spark.createDataFrame(players_data, ("player_id", "player_name", "nationality", ), ).repartition(3)

players_events_joined = events.join(players, ["player_id"])

memoized_call_counter = spark.sparkContext.accumulator(0)
def memoize_call():
cache = {}
def getCurrentTeam(player_id):
global memoized_call_counter
cached_value = cache.get(player_id, None)
if cached_value is not None:
return cached_value
# sleep to mimic network call
time.sleep(1)
# Increment counter everytime cached value can't be lookedup
memoized_call_counter.add(1)
cache[player_id] = f"player_{player_id}_team"
return cache[player_id]
return getCurrentTeam

getCurrentTeam_udf = udf(memoize_call(), StringType())

players_events_joined.withColumn("current_team", getCurrentTeam_udf(F.col("player_id"))).show()

Output

+---------+--------+--------+------------+-----------+-----------+-------------+
|player_id|event_id|match_id|impact_score|player_name|nationality| current_team|
+---------+--------+--------+------------+-----------+-----------+-------------+
| 2| 2| 1| 20| Player2| Nat|player_2_team|
| 2| 1| 1| 20| Player2| Nat|player_2_team|
| 3| 2| 1| 30| Player3| Nat|player_3_team|
| 3| 1| 1| 30| Player3| Nat|player_3_team|
| 1| 2| 1| 10| Player1| Nat|player_1_team|
| 1| 1| 1| 10| Player1| Nat|player_1_team|
+---------+--------+--------+------------+-----------+-----------+-------------+
>>> memoized_call_counter.value
3

Since there are 3 unique players in total the logic after time.sleep(1) was called only thrice. The number of calls is dependent on the number of workers, since the cache is not shared across workers. As I ran the example in local mode (wuth 1 worker) we see that the number of calls is equal to number of workers.

add new column in a dataframe depending on another dataframe's row values

Instead of DF2, you can translate DF2 to case class like Specifications, e.g

case class Spec(columnName:String,inputColumns:Seq[String],action:String,action:String,type:String*){}

Create instances of above class

val specifications = Seq(
Spec("new_col_name",Seq("serialNum","testProperty"),"hash","append")
)

Then you can process the below columns

 val transformed =  specifications
.foldLeft(dtFrm)((df: DataFrame, spec: Specification) => df.transform(transformColumn(columnSpec)))

def transformColumn(spec: Spec)(df: DataFrame): DataFrame = {

spec.type.foldLeft(df)((df: DataFrame, type : String) => {
type match {
case "append" => {have a case match of the action and do that , then append with df.withColumn}

}
}

Syntax may not be correct



Related Topics



Leave a reply



Submit