Append List of Lists as Column to Pyspark'S Dataframe (Concatenating Two Dataframes Without Common Column)

Append list of lists as column to PySpark's dataframe (Concatenating two dataframes without common column)

UDF's are generally slow but a more efficient way without using any UDF's would be:

import pyspark.sql.functions as F

ldf = spark.createDataFrame(l, schema = "array<int>")

df1 = df.withColumn("m_id", F.monotonically_increasing_id())
df2 = ldf.withColumn("m_id", F.monotonically_increasing_id())

df3 = df2.join(df1, "m_id", "outer").drop("m_id")
df3.select("id", "value").show()
+---+------+
| id| value|
+---+------+
| a|[1, 1]|
| b|[2, 2]|
| d|[4, 4]|
| c|[3, 3]|
| e|[5, 5]|
+---+------+

How to conditionally combine two PySpark Dataframes?

join on id, followed by when/otherwise pattern is what you are after. This code works for your example:

  from pyspark.sql.functions import when

result = (df1.join(df2, on=['id']).
withColumn('_col1', when(df1['col1'].isNotNull(), df1['col1']).otherwise(df2['col1'])).
withColumn('_col2', when(df1['col2'].isNotNull(), df1['col2']).otherwise(df2['col2'])).
select('id', '_col1', '_col2', 'col3', 'col4').
toDF('id', 'col1', 'col2', 'col3', 'col4'))

PySpark - Adding a Column from a list of values

from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window

#sample data
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
["Animal", "Enemy"])
a.show()

#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])

#add 'sequential' index and join both dataframe to get the final result
a = a.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))

final_df = a.join(b, a.row_idx == b.row_idx).\
drop("row_idx")
final_df.show()

Input:

+------+-----+
|Animal|Enemy|
+------+-----+
| Dog| Cat|
| Cat| Dog|
| Mouse| Cat|
+------+-----+

Output is:

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
| Cat| Dog| 4|
| Dog| Cat| 5|
| Mouse| Cat| 1|
+------+-----+------+

How to concatenate two dataframes without duplicates?

The simplest way is to just do the concatenation, and then drop duplicates.

>>> df1
A B
0 1 2
1 3 1
>>> df2
A B
0 5 6
1 3 1
>>> pandas.concat([df1,df2]).drop_duplicates().reset_index(drop=True)
A B
0 1 2
1 3 1
2 5 6

The reset_index(drop=True) is to fix up the index after the concat() and drop_duplicates(). Without it you will have an index of [0,1,0] instead of [0,1,2]. This could cause problems for further operations on this dataframe down the road if it isn't reset right away.

Appending two dataframes with same columns, different order

You could also use pd.concat:

In [36]: pd.concat([noclickDF, clickDF], ignore_index=True)
Out[36]:
click id location
0 0 123 321
1 0 1543 432
2 1 421 123
3 1 436 1543

Under the hood, DataFrame.append calls pd.concat.
DataFrame.append has code for handling various types of input, such as Series, tuples, lists and dicts. If you pass it a DataFrame, it passes straight through to pd.concat, so using pd.concat is a bit more direct.

Join condition on data frames with list as entries

What's about below approach?

Cross join your both data frames, add a column with array_intersect function and then filter your joined dataset having size of intersected resultant column > 0.

For example:

df1 = spark.read  # ... Read your first source
df2 = spark.read # ... Read your other source

from pyspark.sql import functions as fn

joined = df1.crossJoin(df2). \
withColumn("common_join_keys", fn.array_intersect(fn.col("joinkey1"), fn.col("joinkey2")))

result = joined.filter(fn.size(fn.col("common_join_keys")) > 0) # your condition

result.show(truncate=False)

How to join/merge a list of dataframes with common keys in PySpark?

You can join a list of dataframe. Below is the simple example

import spark.implicits._
val df1 = spark.sparkContext.parallelize(Seq(
(0,"John",3),
(1,"Paul",4),
(2,"George",5)
)).toDF("id", "uid1", "var1")

import spark.implicits._
val df2 = spark.sparkContext.parallelize(Seq(
(0,"John",23),
(1,"Paul",44),
(2,"George",52)
)).toDF("id", "uid1", "var2")

import spark.implicits._
val df3 = spark.sparkContext.parallelize(Seq(
(0,"John",31),
(1,"Paul",45),
(2,"George",53)
)).toDF("id", "uid1", "var3")


val df = List(df1, df2, df3)

df.reduce((a,b) => a.join(b, Seq("id", "uid1")))

Output:

+---+------+----+----+----+
| id| uid1|var1|var2|var3|
+---+------+----+----+----+
| 1| Paul| 4| 44| 45|
| 2|George| 5| 52| 53|
| 0| John| 3| 23| 31|
+---+------+----+----+----+

Hope this helps!



Related Topics



Leave a reply



Submit