Pyspark Add New Row to Dataframe

Add new rows to pyspark Dataframe

As thebluephantom has already said union is the way to go. I'm just answering your question to give you a pyspark example:

# if not already created automatically, instantiate Sparkcontext
spark = SparkSession.builder.getOrCreate()

columns = ['id', 'dogs', 'cats']
vals = [(1, 2, 0), (2, 0, 1)]

df = spark.createDataFrame(vals, columns)

newRow = spark.createDataFrame([(4,5,7)], columns)
appended = df.union(newRow)
appended.show()

Please have also a lookat the databricks FAQ: https://kb.databricks.com/data/append-a-row-to-rdd-or-dataframe.html

pyspark add new row to dataframe

Try: (Documentation)

from pyspark.sql import Row
newDf = sc.parallelize([Row(id='ID123')]).toDF()
newDF.show()

Pyspark add row based on a condition

Instead of "insert a row" – which is a non-trivial issue to solve –, think about it as "union dataset"

Assuming this is your dataset
df = spark.createDataFrame([
(1, 'open', '01.01.22 10:05:04'),
(1, 'In process', '01.01.22 10:07:02'),
], ['a', 'b', 'c'])

+---+----------+-----------------+
| a| b| c|
+---+----------+-----------------+
| 1| open|01.01.22 10:05:04|
| 1|In process|01.01.22 10:07:02|
+---+----------+-----------------+
Based on your rule, we can construct another dataset like this
from pyspark.sql import functions as F

df_new = (df
.where(F.col('b') == 'open')
.withColumn('b', F.lit('Before open'))
.withColumn('c', F.to_timestamp('c', 'dd.MM.yy HH:mm:ss')) # convert text to date with custom date format
.withColumn('c', F.col('c') - F.expr('interval 1 hour')) # subtract 1 hour
.withColumn('c', F.from_unixtime(F.unix_timestamp('c'), 'dd.MM.yy HH:mm:ss')) # revert to custom date format
)

+---+-----------+-----------------+
| a| b| c|
+---+-----------+-----------------+
| 1|Before open|01.01.22 09:05:04|
+---+-----------+-----------------+
Now you just need to union them together, and sort if you want to "see" it
(df
.union(df_new)
.orderBy('a', 'c')
.show()
)

+---+-----------+-----------------+
| a| b| c|
+---+-----------+-----------------+
| 1|Before open|01.01.22 09:05:04|
| 1| open|01.01.22 10:05:04|
| 1| In process|01.01.22 10:07:02|
+---+-----------+-----------------+

Add rows of data to each group in a Spark dataframe

See my attempt below. Could have made it shorter but felt should be as explicit as I can so I dint chain the soultions. code below

from pyspark.sql import functions as F
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")


# Convert week of the year to date
s=data_df.withColumn("week", expr("cast (week as string)")).withColumn('date', F.to_date(F.concat("week",F.lit("6")), "yyyywwu"))


s = (s.groupby('item', 'store').agg(F.collect_list('sales').alias('sales'),F.collect_list('date').alias('date'))#Put sales and dates in an array
.withColumn("id", sequence(lit(0), lit(6)))#Create sequence ids with the required expansion range per group
)

#Explode datframe back with each item/store combination in a row
s =s.selectExpr('item','store','inline(arrays_zip(date,id,sales))')

#Create partition window broadcasting from start to end for each item/store combination
w = Window.partitionBy('item','store').orderBy('id').rowsBetween(-sys.maxsize, sys.maxsize)

#Create partition window broadcasting from start to end for each item/store/date combination. the purpose here is to aggregate over null dates as group
w1 = Window.partitionBy('item','store','date').orderBy('id').rowsBetween(Window.unboundedPreceding, Window.currentRow)

s=(s.withColumn('increment', F.when(col('date').isNull(),(row_number().over(w1))*7).otherwise(0))#Create increment values per item/store combination

.withColumn('date1', F.when(col('date').isNull(),max('date').over(w)).otherwise(col('date')))#get last date in each item/store combination

)



# #Compute the week of year and drop columns not wanted
s = s.withColumn("weekofyear", expr("weekofyear(date_add(date1, cast(increment as int)))")).drop('date','increment','date1').na.fill(0)



s.show(truncate=False)

Outcome

+----+-----+---+-----+----------+
|item|store|id |sales|weekofyear|
+----+-----+---+-----+----------+
|1 |1 |0 |3 |5 |
|1 |1 |1 |5 |6 |
|1 |1 |2 |7 |7 |
|1 |1 |3 |2 |8 |
|1 |1 |4 |0 |9 |
|1 |1 |5 |0 |10 |
|1 |1 |6 |0 |11 |
|2 |2 |0 |3 |50 |
|2 |2 |1 |0 |51 |
|2 |2 |2 |1 |52 |
|2 |2 |3 |1 |1 |
|2 |2 |4 |0 |2 |
|2 |2 |5 |0 |3 |
|2 |2 |6 |0 |4 |
+----+-----+---+-----+----------+

How to add a new column with random chars to pyspark dataframe

You can use the uuid function to generate a string, and then replace the - in it.

df = df.withColumn("randomid", F.expr('replace(uuid(), "-", "")'))

how to sequentially iterate rows in Pyspark Dataframe

Problem solved.
Even though this way costs a lot,but it's ok.

  def check(part):
df = part
size = len(df)
for i in range(size):
if (df.loc[i,'repeated'] == True):
continue
else:
for j in range((i+1),size):
if (df.loc[i,'nature']!=df.loc[j,'nature']) & (df.loc[j,'repeated']==False):
df.loc[j,'repeated'] = True
df.loc[i,'repeated'] = True
break
return df

df.groupby("Account","value").applyInPandas(check, schema="Account string, nature int,value long,time string,repeated boolean").show()

Update1:
Another solution without any iterations.

def check(df):
df = df.sort_values('verified_time')
df['index'] = df.index
df['IS_REPEATED'] = 0
df1 = df.sort_values(['nature'],ascending=[True]).reset_index(drop=True)
df2 = df.sort_values(['nature'],ascending=[False]).reset_index(drop=True)
df1['IS_REPEATED']=df1['nature']^df2['nature']
df3 = df1.sort_values(['index'],ascending=[True])
df = df3.drop(['index'],axis=1)
return df

df = df.groupby("account", "value").applyInPandas(gf.check2,schema=gf.get_schema('trx'))

UPDATE2:
Solution with Spark window:

def is_repeated_feature(df):
windowPartition = Window.partitionBy("account", "value", 'nature').orderBy('nature')
df_1 = df.withColumn('rank', F.row_number().over(windowPartition))
w = (Window
.partitionBy('account', 'value')
.orderBy('nature')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df_1 = df_1.withColumn("count_nature", F.count('nature').over(w))
df_1 = df_1.withColumn('sum_nature', F.sum('nature').over(w))
df_1 = df_1.select('*')

df_2 = df_1.withColumn('min_val',
when((df_1.sum_nature > (df_1.count_nature - df_1.sum_nature)),
(df_1.count_nature - df_1.sum_nature)).otherwise(df_1.sum_nature))
df_2 = df_2.withColumn('more_than_one', when(df_2.count_nature > 1, '1').otherwise('0'))
df_2 = df_2.withColumn('is_repeated',
when(((df_2.more_than_one == 1) & (df_2.count_nature > df_2.sum_nature) & (
df_2.rank <= df_2.min_val)), '1')
.otherwise('0'))
return df_2

Add total per group as a new row in dataframe in Pyspark

I think you just need the rollup method.

agg_df = (
df.rollup(["week_num", "parent", "brand", "channel"])
.agg(F.sum("usage").alias("usage"), F.grouping_id().alias("lvl"))
.orderBy(agg_cols)
)

agg_df.show()
+--------+------+-----+-------+-----+---+
|week_num|parent|brand|channel|usage|lvl|
+--------+------+-----+-------+-----+---+
| null| null| null| null|31658| 15|
| 2| null| null| null| 6000| 7|
| 2| A| null| null| 6000| 3|
| 2| A| A2| null| 6000| 1|
| 2| A| A2| A2TV| 3500| 0|
| 2| A| A2| A2web| 2500| 0|
| 4| null| null| null|20700| 7|
| 4| A| null| null| 7500| 3|
| 4| A| A1| null| 5500| 1|
| 4| A| A1| A2app| 5500| 0|
| 4| A| AD| null| 2000| 1|
| 4| A| AD| ADapp| 2000| 0|
| 4| B| null| null|13200| 3|
| 4| B| B25| null| 7600| 1|
| 4| B| B25| B25app| 7600| 0|
| 4| B| B26| null| 5600| 1|
| 4| B| B26| B26app| 5600| 0|
| 5| null| null| null| 4958| 7|
| 5| C| null| null| 4958| 3|
| 5| C| c25| null| 2658| 1|
+--------+------+-----+-------+-----+---+
only showing top 20 rows

The rest is pure cosmetic. Probably not a good idea to do that with spark. better do that in the restition tool you will use after.

agg_df = agg_df.withColumn("lvl", F.dense_rank().over(Window.orderBy("lvl")))

TOTAL = "Total"
agg_df = (
agg_df.withColumn(
"parent", F.when(F.col("lvl") == 4, TOTAL).otherwise(F.col("parent"))
)
.withColumn(
"brand",
F.when(F.col("lvl") == 3, TOTAL).otherwise(
F.coalesce(F.col("brand"), F.lit(""))
),
)
.withColumn(
"channel",
F.when(F.col("lvl") == 2, TOTAL).otherwise(
F.coalesce(F.col("channel"), F.lit(""))
),
)
)

agg_df.where(F.col("lvl") != 5).orderBy(
"week_num", F.col("lvl").desc(), "parent", "brand", "channel"
).drop("lvl").show(500)

+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| Total| | | 6000|
| 2| A|Total| | 6000|
| 2| A| A2| Total| 6000|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| A2web| 2500|
| 4| Total| | |20700|
| 4| A|Total| | 7500|
| 4| B|Total| |13200|
| 4| A| A1| Total| 5500|
| 4| A| AD| Total| 2000|
| 4| B| B25| Total| 7600|
| 4| B| B26| Total| 5600|
| 4| A| A1| A2app| 5500|
| 4| A| AD| ADapp| 2000|
| 4| B| B25| B25app| 7600|
| 4| B| B26| B26app| 5600|
| 5| Total| | | 4958|
| 5| C|Total| | 4958|
| 5| C| c25| Total| 2658|
| 5| C| c27| Total| 1100|
| 5| C| c28| Total| 1200|
| 5| C| c25| c25app| 2658|
| 5| C| c27| c27app| 1100|
| 5| C| c28| c26app| 1200|
+--------+------+-----+-------+-----+

Pyspark RDD create 2 rows from one row into new Dataframe

when expression + explode literal array:

from pyspark.sql import functions as F

df1 = df.withColumn(
"count",
F.explode(
F.when(F.col("type") == "start", F.array(F.lit(1)))
.when(F.col("type") == "finished", F.array(F.lit(1), F.lit(1)))
)
).drop("platform", "type")

df1.show()

#+--------------+-----+
#| game|count|
#+--------------+-----+
#| valorant| 1|
#|counter-strike| 1|
#| sims| 1|
#| sims| 1|
#+--------------+-----+


Related Topics



Leave a reply



Submit