Append list of lists as column to PySpark's dataframe (Concatenating two dataframes without common column)
UDF's are generally slow but a more efficient way without using any UDF's would be:
import pyspark.sql.functions as F
ldf = spark.createDataFrame(l, schema = "array<int>")
df1 = df.withColumn("m_id", F.monotonically_increasing_id())
df2 = ldf.withColumn("m_id", F.monotonically_increasing_id())
df3 = df2.join(df1, "m_id", "outer").drop("m_id")
df3.select("id", "value").show()
+---+------+
| id| value|
+---+------+
| a|[1, 1]|
| b|[2, 2]|
| d|[4, 4]|
| c|[3, 3]|
| e|[5, 5]|
+---+------+
How to conditionally combine two PySpark Dataframes?
join
on id
, followed by when
/otherwise
pattern is what you are after. This code works for your example:
from pyspark.sql.functions import when
result = (df1.join(df2, on=['id']).
withColumn('_col1', when(df1['col1'].isNotNull(), df1['col1']).otherwise(df2['col1'])).
withColumn('_col2', when(df1['col2'].isNotNull(), df1['col2']).otherwise(df2['col2'])).
select('id', '_col1', '_col2', 'col3', 'col4').
toDF('id', 'col1', 'col2', 'col3', 'col4'))
PySpark - Adding a Column from a list of values
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window
#sample data
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
["Animal", "Enemy"])
a.show()
#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])
#add 'sequential' index and join both dataframe to get the final result
a = a.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
final_df = a.join(b, a.row_idx == b.row_idx).\
drop("row_idx")
final_df.show()
Input:
+------+-----+
|Animal|Enemy|
+------+-----+
| Dog| Cat|
| Cat| Dog|
| Mouse| Cat|
+------+-----+
Output is:
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
| Cat| Dog| 4|
| Dog| Cat| 5|
| Mouse| Cat| 1|
+------+-----+------+
How to concatenate two dataframes without duplicates?
The simplest way is to just do the concatenation, and then drop duplicates.
>>> df1
A B
0 1 2
1 3 1
>>> df2
A B
0 5 6
1 3 1
>>> pandas.concat([df1,df2]).drop_duplicates().reset_index(drop=True)
A B
0 1 2
1 3 1
2 5 6
The reset_index(drop=True)
is to fix up the index after the concat()
and drop_duplicates()
. Without it you will have an index of [0,1,0]
instead of [0,1,2]
. This could cause problems for further operations on this dataframe
down the road if it isn't reset right away.
Appending two dataframes with same columns, different order
You could also use pd.concat:
In [36]: pd.concat([noclickDF, clickDF], ignore_index=True)
Out[36]:
click id location
0 0 123 321
1 0 1543 432
2 1 421 123
3 1 436 1543
Under the hood, DataFrame.append
calls pd.concat
.DataFrame.append
has code for handling various types of input, such as Series, tuples, lists and dicts. If you pass it a DataFrame, it passes straight through to pd.concat
, so using pd.concat
is a bit more direct.
Join condition on data frames with list as entries
What's about below approach?
Cross join your both data frames, add a column with array_intersect function and then filter your joined dataset having size of intersected resultant column > 0.
For example:
df1 = spark.read # ... Read your first source
df2 = spark.read # ... Read your other source
from pyspark.sql import functions as fn
joined = df1.crossJoin(df2). \
withColumn("common_join_keys", fn.array_intersect(fn.col("joinkey1"), fn.col("joinkey2")))
result = joined.filter(fn.size(fn.col("common_join_keys")) > 0) # your condition
result.show(truncate=False)
How to join/merge a list of dataframes with common keys in PySpark?
You can join a list of dataframe. Below is the simple example
import spark.implicits._
val df1 = spark.sparkContext.parallelize(Seq(
(0,"John",3),
(1,"Paul",4),
(2,"George",5)
)).toDF("id", "uid1", "var1")
import spark.implicits._
val df2 = spark.sparkContext.parallelize(Seq(
(0,"John",23),
(1,"Paul",44),
(2,"George",52)
)).toDF("id", "uid1", "var2")
import spark.implicits._
val df3 = spark.sparkContext.parallelize(Seq(
(0,"John",31),
(1,"Paul",45),
(2,"George",53)
)).toDF("id", "uid1", "var3")
val df = List(df1, df2, df3)
df.reduce((a,b) => a.join(b, Seq("id", "uid1")))
Output:
+---+------+----+----+----+
| id| uid1|var1|var2|var3|
+---+------+----+----+----+
| 1| Paul| 4| 44| 45|
| 2|George| 5| 52| 53|
| 0| John| 3| 23| 31|
+---+------+----+----+----+
Hope this helps!
Related Topics
How to Replace Negative Numbers in Pandas Data Frame by Zero
Django Viewset Has Not Attribute 'Get_Extra_Actions'
Fastest Way to Compute Image Dataset Channel Wise Mean and Standard Deviation in Python
Numpy Import Throws Attributeerror: 'Module' Object Has No Attribute 'Core'
How to Hide Chrome Driver in Python
How to Normalize a 2-Dimensional Numpy Array in Python Less Verbose
Most Efficient Way to Find Mode in Numpy Array
Get Span Inside a Class Using Webdriver and Selenium
Pandas Open_Excel() Fails With Xlrd.Biffh.Xlrderror: Can't Find Workbook in Ole2 Compound Document
Convert Timedelta to Floating-Point
Insert Comma into Text File Using Python
Extract Values from Column of Dictionaries Using Pandas
How to Skip Empty Dates (Weekends) in a Financial Matplotlib Python Graph