Spark add new column to dataframe with value from previous row
You can use lag
window function as follows
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+
but there some important issues:
- if you need a global operation (not partitioned by some other column / columns) it is extremely inefficient.
- you need a natural way to order your data.
While the second issue is almost never a problem the first one can be a deal-breaker. If this is the case you should simply convert your DataFrame
to RDD and compute lag
manually. See for example:
- How to transform data with sliding window over time series data in Pyspark
- Apache Spark Moving Average (written in Scala, but can be adjusted for PySpark. Be sure to read the comments first).
Other useful links:
- https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/05_moving-average-imputation.ipynb
- Spark Window Functions - rangeBetween dates
Add new column to dataframe based on previous values and condition
val amount = ss.sparkContext.parallelize(Seq(("B","2014-12-31", 3456))).toDF("level1", "dateY", "amount")
val yearStr = udf((date:String) => {(date.substring(0,4).toInt - 1) +"-12-31" })
val df3 = amount.withColumn( "p", yearStr($"dateY"))
df3.show()
df3.createOrReplaceTempView("dfView")
val df4 = df3.filter( s => s.getString(1).contains("12-31")).select( $"dateY".as("p"), $"level1",$"amount".as("am"))
df4.show
df3.join( df4, Seq("p", "level1"), "left_outer").orderBy("level1", "amount").drop($"p").show()
Adding a new column in Data Frame derived from other columns (Spark)
One way to achieve that is to use withColumn
method:
old_df = sqlContext.createDataFrame(sc.parallelize(
[(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2'))
new_df = old_df.withColumn('col_n', old_df.col_1 - old_df.col_2)
Alternatively you can use SQL on a registered table:
old_df.registerTempTable('old_df')
new_df = sqlContext.sql('SELECT *, col_1 - col_2 AS col_n FROM old_df')
Add new column with maximum value of another column in pyspark dataframe
I don't think we can use aggregate functions in withColumn, But here are the workaround for this case.
1.Using crossJoin
:
from pyspark.sql.functions import *
df.show()
#+---+----+
#| id|name|
#+---+----+
#| 1| a|
#| 2| b|
#| 3| c|
#+---+----+
df1=df.agg(max('id'))
spark.sql("set spark.sql.crossJoin.enabled=true")
#cross join
df.join(df1)
#or
df.crossJoin(df1).show()
+---+----+-------+
#| id|name|max(id)|
#+---+----+-------+
#| 1| a| 3|
#| 2| b| 3|
#| 3| c| 3|
#+---+----+-------+
2. Using Window function:
from pyspark.sql import *
import sys
w=Window.orderBy(monotonically_increasing_id()).rowsBetween(-sys.maxsize,sys.maxsize)
df.withColumn("max",max(col("id")).over(w)).show()
#+---+----+---+
#| id|name|max|
#+---+----+---+
#| 1| a| 3|
#| 2| b| 3|
#| 3| c| 3|
#+---+----+---+
3. Using variable substitution:
max_value=df.agg(max("id")).collect()[0][0]
df.withColumn("max",lit(max_value)).show()
#or
max_value=lit(df.agg(max("id")).collect()[0][0])
type(max_value)
#<class 'pyspark.sql.column.Column'>
df.withColumn("max",max_value).show()
#+---+----+---+
#| id|name|max|
#+---+----+---+
#| 1| a| 3|
#| 2| b| 3|
#| 3| c| 3|
#+---+----+---+
Using Spark-sql:
df.createOrReplaceTempView("tmp")
spark.sql("select * from tmp cross join (select max(id) max_val from tmp) t1").show()
spark.sql("select *,max(id) over(order by id rows between unbounded preceding and unbounded following) as max_val from tmp").show()
max_value=df.agg(max(col("id"))).collect()[0][0]
spark.sql("select *,{0} as max_val from tmp".format(max_value)).show()
#+---+----+-------+
#| id|name|max_val|
#+---+----+-------+
#| 1| a| 3|
#| 2| b| 3|
#| 3| c| 3|
#+---+----+-------+
SPARK 3 - Populate value with value from previous rows (lookup)
Due to it's distributed nature, Spark can't allow for if allow populated in previous call then use it otherwise call created value. There are two possible options.
- Since you are applying an inner join and
players
df has the list of all distinct players, you can add thecurrent_team
column to this df before applying a join. If theplayers
df is cached before joining then it's possible that theUDF
is invoked only once for each player. See discussion here for why UDF can be called multiple time for each record. - You can memoize
getCurrentTeam
Working Example - Prepopulate current_team
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
events_data = [(1, 1, 1, 10), (1, 2, 1, 20, ), (1, 3, 1, 30, ), (2, 3, 1, 30, ), (2, 1, 1, 10), (2, 2, 1, 20, ), ]
players_data = [(1, "Player1", "Nat", ), (2, "Player2", "Nat", ), (3, "Player3", "Nat", ), ]
events = spark.createDataFrame(events_data, ("event_id", "player_id", "match_id", "impact_score", ), ).repartition(3)
players = spark.createDataFrame(players_data, ("player_id", "player_name", "nationality", ), ).repartition(3)
@udf(StringType())
def getCurrentTeam(player_id):
return f"player_{player_id}_team"
players_with_current_team = players.withColumn("current_team", getCurrentTeam(F.col("player_id"))).cache()
events.join(players_with_current_team, ["player_id"]).show()
Output
+---------+--------+--------+------------+-----------+-----------+-------------+
|player_id|event_id|match_id|impact_score|player_name|nationality| current_team|
+---------+--------+--------+------------+-----------+-----------+-------------+
| 2| 2| 1| 20| Player2| Nat|player_2_team|
| 2| 1| 1| 20| Player2| Nat|player_2_team|
| 3| 2| 1| 30| Player3| Nat|player_3_team|
| 3| 1| 1| 30| Player3| Nat|player_3_team|
| 1| 2| 1| 10| Player1| Nat|player_1_team|
| 1| 1| 1| 10| Player1| Nat|player_1_team|
+---------+--------+--------+------------+-----------+-----------+-------------+
Working Example - Memoization
I have used a python dict for mimicing caching and using an accumulator
to count number of mimicked network calls made.
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import time
events_data = [(1, 1, 1, 10), (1, 2, 1, 20, ), (1, 3, 1, 30, ), (2, 3, 1, 30, ), (2, 1, 1, 10), (2, 2, 1, 20, ), ]
players_data = [(1, "Player1", "Nat", ), (2, "Player2", "Nat", ), (3, "Player3", "Nat", ), ]
events = spark.createDataFrame(events_data, ("event_id", "player_id", "match_id", "impact_score", ), ).repartition(3)
players = spark.createDataFrame(players_data, ("player_id", "player_name", "nationality", ), ).repartition(3)
players_events_joined = events.join(players, ["player_id"])
memoized_call_counter = spark.sparkContext.accumulator(0)
def memoize_call():
cache = {}
def getCurrentTeam(player_id):
global memoized_call_counter
cached_value = cache.get(player_id, None)
if cached_value is not None:
return cached_value
# sleep to mimic network call
time.sleep(1)
# Increment counter everytime cached value can't be lookedup
memoized_call_counter.add(1)
cache[player_id] = f"player_{player_id}_team"
return cache[player_id]
return getCurrentTeam
getCurrentTeam_udf = udf(memoize_call(), StringType())
players_events_joined.withColumn("current_team", getCurrentTeam_udf(F.col("player_id"))).show()
Output
+---------+--------+--------+------------+-----------+-----------+-------------+
|player_id|event_id|match_id|impact_score|player_name|nationality| current_team|
+---------+--------+--------+------------+-----------+-----------+-------------+
| 2| 2| 1| 20| Player2| Nat|player_2_team|
| 2| 1| 1| 20| Player2| Nat|player_2_team|
| 3| 2| 1| 30| Player3| Nat|player_3_team|
| 3| 1| 1| 30| Player3| Nat|player_3_team|
| 1| 2| 1| 10| Player1| Nat|player_1_team|
| 1| 1| 1| 10| Player1| Nat|player_1_team|
+---------+--------+--------+------------+-----------+-----------+-------------+
>>> memoized_call_counter.value
3
Since there are 3 unique players in total the logic after
time.sleep(1)
was called only thrice. The number of calls is dependent on the number of workers, since the cache is not shared across workers. As I ran the example in local mode (wuth 1 worker) we see that the number of calls is equal to number of workers.
add new column in a dataframe depending on another dataframe's row values
Instead of DF2, you can translate DF2 to case class like Specifications, e.g
case class Spec(columnName:String,inputColumns:Seq[String],action:String,action:String,type:String*){}
Create instances of above class
val specifications = Seq(
Spec("new_col_name",Seq("serialNum","testProperty"),"hash","append")
)
Then you can process the below columns
val transformed = specifications
.foldLeft(dtFrm)((df: DataFrame, spec: Specification) => df.transform(transformColumn(columnSpec)))
def transformColumn(spec: Spec)(df: DataFrame): DataFrame = {
spec.type.foldLeft(df)((df: DataFrame, type : String) => {
type match {
case "append" => {have a case match of the action and do that , then append with df.withColumn}
}
}
Syntax may not be correct
Related Topics
How to Copy/Repeat an Array N Times into a New Array
How to Select Percentage of Rows in Pandas Dataframe
Sys.Path Different in Jupyter and Python - How to Import Own Modules in Jupyter
How to Upgrade the Sqlite Version Used by Python'S Sqlite3 Module on Mac
Add Excel File Attachment When Sending Python Email
Create New Column Based on String
Masking Horizontal and Vertical Lines With Open Cv
Best Way to Identify and Extract Dates from Text Python
_Corrupt_Record Error When Reading a Json File into Spark
How to Locate the Input Within Div
Get First Date and Last Date of Current Quarter in Python
How to Increase the Font Size of the Markdown Table in Jupyter Notebook
Calculate Rgb Value for a Range of Values to Create Heat Map
Python Replace Empty Strings in a List With Values from a Different List