Passing a data frame column and external list to udf under withColumn
The cleanest solution is to pass additional arguments using closure:
def make_topic_word(topic_words):
return udf(lambda c: label_maker_topic(c, topic_words))
df = sc.parallelize([(["union"], )]).toDF(["tokens"])
(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
.show())
This doesn't require any changes in keyword_list
or the function you wrap with UDF. You can also use this method to pass an arbitrary object. This can be used to pass for example a list of sets
for efficient lookups.
If you want to use your current UDF and pass topic_words
directly you'll have to convert it to a column literal first:
from pyspark.sql.functions import array, lit
ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()
Depending on your data and requirements there can alternative, more efficient solutions, which don't require UDFs (explode + aggregate + collapse) or lookups (hashing + vector operations).
How to pass an extra argument to UDF using withColumn
udf s can recognize only row elements. So to pass a fixed argument you have to use lit() function. Also your udf definition has to be corrected. Try this:
import pyspark.sql.functions as F
from pyspark.sql.types import *
df = spark.createDataFrame([
["aaa","1"],
["bbb","2"],
["ccc","5"]
]).toDF("text","id")
def title(x,y):
if y:
x = x.title()
return x
title_udf = F.udf(title, StringType())
df = df.withColumn('text_title',title_udf('text',F.lit(True)))
df.show()
+----+---+----------+
|text| id|text_title|
+----+---+----------+
| aaa| 1| Aaa|
| bbb| 2| Bbb|
| ccc| 5| Ccc|
+----+---+----------+
As indicated by @powers in the comment, if this output is your ultimate purpose, ,then you can do this without a udf using initcap() function
df = df.withColumn("text_title",F.when(F.lit(True),F.initcap(F.col('text'))).otherwise(F.col('text')))
You can also use other columns as condition like the 'id' column
df = df.withColumn("text_title",F.when(F.col('id')>2,F.initcap(F.col('text'))).otherwise(F.col('text')))
How to pass data into udf not from row
You probably want to use an external map parameter to your udf, and broadcast this map to each machine to avoid having a copy of it for each task.
For your UDF, you can do something along those lines:
def yourUDF(rulesMap: Map[String, XXX]): UserDefinedFunction = udf {
(body: YYY, ruleId: String) => applyYourRules(body, rulesMap(ruleId))
} // XXX and YYY are the types you need, I don't know the problem you're trying to solve
And as you map is quite large, you can avoid duplicating it for every task by broadcasting the variable (you can access a broadcasted variable with variable.value
):
val rulesMapBroadcasted = spark.sparkContext.broadcast(rulesMap)
df.withColumn("new", yourUDF(rulesMap = rulesMapBroadcasted.value)(col("body"), col("ruleId")))
A broadcasted variable is a read-only variable duplicated only once per machine (in comparison, a classical variable is duplicated once per task), so this is a perfect usage for large lookup table.
Use WithColumn with external function
You can create your UDF of the external method similar to the following (illustrated using Scala REPL):
// From a Linux shell prompt:
vi MyJava.java
public class MyJava {
public Double calculateExpense(Double pexpense, Double cexpense) {
return pexpense + cexpense;
}
}
:wq
javac MyJava.java
jar -cvf MyJava.jar MyJava.class
spark-shell --jars /path/to/jar/MyJava.jar
// From within the Spark shell
val df = Seq(
("1", "1.0", "2.0"), ("2", "3.0", "4.0")
).toDF("employeeid", "pexpense", "cexpense")
val myJava = new MyJava
val myJavaUdf = udf(
myJava.calculateExpense _
)
val df2 = df.withColumn("totalexpense", myJavaUdf($"pexpense", $"cexpense") )
df2.show
+----------+--------+--------+------------+
|employeeid|pexpense|cexpense|totalexpense|
+----------+--------+--------+------------+
| 1| 1.0| 2.0| 3.0|
| 2| 3.0| 4.0| 7.0|
+----------+--------+--------+------------+
Creating/Registering a PySpark UDF and apply it to one column
This
spark.udf.register("squaredWithPython", squared)
if you want to use with SQL like this:%sql select id, squaredWithPython(id) as id_squared from test
This
squared_udf = udf(squared, LongType())
if just with data frame usage like this:display(df.select("id", squared_udf("id").alias("id_squared")))
That's all, but things not always clearly explained in the manuals.
Passing column to when function in pyspark
Thanks for your replies.
I've found the solution by converting the column to pandas.
stList = list(plants.select(F.col('station')).toPandas()['station'])
and then use:
F.when(stations.station.isNull(), F.array([F.lit(x) for x in station])).otherwise(stations['station']).alias('station')
it gives directly an array.
PySpark - Pass list as parameter to UDF
from pyspark.sql.functions import udf, col
#sample data
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]
def cate(label, feature_list):
if feature_list == 0:
return label[4]
else: #you may need to add 'else' condition as well otherwise 'null' will be added in this case
return 'I am not sure!'
def udf_score(label_list):
return udf(lambda l: cate(l, label_list))
a.withColumn("category", udf_score(label_list)(col("distances"))).show()
Output is:
+------+---------+--------------+
|Letter|distances| category|
+------+---------+--------------+
| A| 20|I am not sure!|
| B| 30|I am not sure!|
| D| 80|I am not sure!|
+------+---------+--------------+
Related Topics
Passing Command Line Arguments to Argv in Jupyter/Ipython Notebook
Format String Unused Named Arguments
Groupby Column and Filter Rows with Maximum Value in Pyspark
Django 1.7 - "No Migrations to Apply" When Run Migrate After Makemigrations
How to Set a Default Value for a Wtforms Selectfield
Why am I Getting a Nameerror When I Try to Call My Function
From ... Import or Import ... as for Modules
How to Interact with the Recaptcha Audio Element Using Selenium and Python
In Tensorflow, Differencebetween Session.Run() and Tensor.Eval()
Django Template System, Calling a Function Inside a Model
How to Understand the Output of Dis.Dis
Stacked Bar Plot Using Matplotlib
How to Enumerate a Range of Numbers Starting at 1