Passing a Data Frame Column and External List to Udf Under Withcolumn

Passing a data frame column and external list to udf under withColumn

The cleanest solution is to pass additional arguments using closure:

def make_topic_word(topic_words):
return udf(lambda c: label_maker_topic(c, topic_words))

df = sc.parallelize([(["union"], )]).toDF(["tokens"])

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
.show())

This doesn't require any changes in keyword_list or the function you wrap with UDF. You can also use this method to pass an arbitrary object. This can be used to pass for example a list of sets for efficient lookups.

If you want to use your current UDF and pass topic_words directly you'll have to convert it to a column literal first:

from pyspark.sql.functions import array, lit

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()

Depending on your data and requirements there can alternative, more efficient solutions, which don't require UDFs (explode + aggregate + collapse) or lookups (hashing + vector operations).

How to pass an extra argument to UDF using withColumn

udf s can recognize only row elements. So to pass a fixed argument you have to use lit() function. Also your udf definition has to be corrected. Try this:

import pyspark.sql.functions as F
from pyspark.sql.types import *
df = spark.createDataFrame([
["aaa","1"],
["bbb","2"],
["ccc","5"]
]).toDF("text","id")

def title(x,y):
if y:
x = x.title()
return x

title_udf = F.udf(title, StringType())

df = df.withColumn('text_title',title_udf('text',F.lit(True)))

df.show()
+----+---+----------+
|text| id|text_title|
+----+---+----------+
| aaa| 1| Aaa|
| bbb| 2| Bbb|
| ccc| 5| Ccc|
+----+---+----------+

As indicated by @powers in the comment, if this output is your ultimate purpose, ,then you can do this without a udf using initcap() function

df = df.withColumn("text_title",F.when(F.lit(True),F.initcap(F.col('text'))).otherwise(F.col('text')))

You can also use other columns as condition like the 'id' column

df = df.withColumn("text_title",F.when(F.col('id')>2,F.initcap(F.col('text'))).otherwise(F.col('text')))

How to pass data into udf not from row

You probably want to use an external map parameter to your udf, and broadcast this map to each machine to avoid having a copy of it for each task.

For your UDF, you can do something along those lines:

def yourUDF(rulesMap: Map[String, XXX]): UserDefinedFunction = udf {
(body: YYY, ruleId: String) => applyYourRules(body, rulesMap(ruleId))
} // XXX and YYY are the types you need, I don't know the problem you're trying to solve

And as you map is quite large, you can avoid duplicating it for every task by broadcasting the variable (you can access a broadcasted variable with variable.value):

val rulesMapBroadcasted = spark.sparkContext.broadcast(rulesMap)
df.withColumn("new", yourUDF(rulesMap = rulesMapBroadcasted.value)(col("body"), col("ruleId")))

A broadcasted variable is a read-only variable duplicated only once per machine (in comparison, a classical variable is duplicated once per task), so this is a perfect usage for large lookup table.

Use WithColumn with external function

You can create your UDF of the external method similar to the following (illustrated using Scala REPL):

// From a Linux shell prompt:

vi MyJava.java
public class MyJava {
public Double calculateExpense(Double pexpense, Double cexpense) {
return pexpense + cexpense;
}
}
:wq

javac MyJava.java
jar -cvf MyJava.jar MyJava.class

spark-shell --jars /path/to/jar/MyJava.jar

// From within the Spark shell

val df = Seq(
("1", "1.0", "2.0"), ("2", "3.0", "4.0")
).toDF("employeeid", "pexpense", "cexpense")

val myJava = new MyJava

val myJavaUdf = udf(
myJava.calculateExpense _
)

val df2 = df.withColumn("totalexpense", myJavaUdf($"pexpense", $"cexpense") )

df2.show
+----------+--------+--------+------------+
|employeeid|pexpense|cexpense|totalexpense|
+----------+--------+--------+------------+
| 1| 1.0| 2.0| 3.0|
| 2| 3.0| 4.0| 7.0|
+----------+--------+--------+------------+

Creating/Registering a PySpark UDF and apply it to one column

  • This spark.udf.register("squaredWithPython", squared) if you want to use with SQL like this: %sql select id, squaredWithPython(id) as id_squared from test

  • This squared_udf = udf(squared, LongType()) if just with data frame usage like this: display(df.select("id", squared_udf("id").alias("id_squared")))

That's all, but things not always clearly explained in the manuals.

Passing column to when function in pyspark

Thanks for your replies.

I've found the solution by converting the column to pandas.

stList = list(plants.select(F.col('station')).toPandas()['station'])

and then use:

F.when(stations.station.isNull(), F.array([F.lit(x) for x in station])).otherwise(stations['station']).alias('station')

it gives directly an array.

PySpark - Pass list as parameter to UDF

from pyspark.sql.functions import udf, col

#sample data
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]

def cate(label, feature_list):
if feature_list == 0:
return label[4]
else: #you may need to add 'else' condition as well otherwise 'null' will be added in this case
return 'I am not sure!'

def udf_score(label_list):
return udf(lambda l: cate(l, label_list))
a.withColumn("category", udf_score(label_list)(col("distances"))).show()

Output is:

+------+---------+--------------+
|Letter|distances| category|
+------+---------+--------------+
| A| 20|I am not sure!|
| B| 30|I am not sure!|
| D| 80|I am not sure!|
+------+---------+--------------+


Related Topics



Leave a reply



Submit