Calling Java/Scala function from a task
Communication using default Py4J gateway is simply not possible. To understand why we have to take a look at the following diagram from the PySpark Internals document [1]:
Since Py4J gateway runs on the driver it is not accessible to Python interpreters which communicate with JVM workers through sockets (See for example PythonRDD
/ rdd.py
).
Theoretically it could be possible to create a separate Py4J gateway for each worker but in practice it is unlikely to be useful. Ignoring issues like reliability Py4J is simply not designed to perform data intensive tasks.
Are there any workarounds?
Using Spark SQL Data Sources API to wrap JVM code.
Pros: Supported, high level, doesn't require access to the internal PySpark API
Cons: Relatively verbose and not very well documented, limited mostly to the input data
Operating on DataFrames using Scala UDFs.
Pros: Easy to implement (see Spark: How to map Python with Scala or Java User Defined Functions?), no data conversion between Python and Scala if data is already stored in a DataFrame, minimal access to Py4J
Cons: Requires access to Py4J gateway and internal methods, limited to Spark SQL, hard to debug, not supported
Creating high level Scala interface in a similar way how it is done in MLlib.
Pros: Flexible, ability to execute arbitrary complex code. It can be don either directly on RDD (see for example MLlib model wrappers) or with
DataFrames
(see How to use a Scala class inside Pyspark). The latter solution seems to be much more friendly since all ser-de details are already handled by existing API.Cons: Low level, required data conversion, same as UDFs requires access to Py4J and internal API, not supported
Some basic examples can be found in Transforming PySpark RDD with Scala
Using external workflow management tool to switch between Python and Scala / Java jobs and passing data to a DFS.
Pros: Easy to implement, minimal changes to the code itself
Cons: Cost of reading / writing data (Alluxio?)
Using shared
SQLContext
(see for example Apache Zeppelin or Livy) to pass data between guest languages using registered temporary tables.Pros: Well suited for interactive analysis
Cons: Not so much for batch jobs (Zeppelin) or may require additional orchestration (Livy)
- Joshua Rosen. (2014, August 04) PySpark Internals. Retrieved from https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
Call function from Scala in Java
Since process
in your example is a function of type String => String
, To invoke a function, you need to call apply()
on it.
Scala by default invokes apply
when client code uses parenthsis ()
, but java won't recognise that.
object Path {
def process: String => String = (input: String) => "processed"
}
scala> process("some value")
res88: String = processed
scala> process.apply("some value")
res89: String = processed
NOTE: The expanded version of scala function is,
def process = new Function[String, String] {
override def apply(input: String) = "processed"
}
invoking from java,
public class Test {
public static void main(String[] args) {
Path f = new Path();
System.out.println(f.process().apply("som input")); //prints processed
}
}
Calling Java method that receive variable amount of parameters from Scala
Try
def info(message: String, any: Any*): Unit = {
LOGGER.info(message, any.asInstanceOf[Seq[Object]]: _*)
}
or
def info(message: String, any: AnyRef*): Unit = {
LOGGER.info(message, any: _*)
}
without casting but not applicable to primitive types.
Pass Java-method-call to Scala class/method
It does not compile since you're not passing the method to ScalaButton.create
, you're passing the result of the method call, which is void
.
If you want to pass a function to Scala from Java, you need to construct an instance of - in this case - AbstractFunction0<BoxedUnit>
, which corresponds to () => Unit
.
To do this, you need to:
import scala.AbstractFunction0
import scala.runtime.BoxedUnit
and then:
buttonOne.create("Label", new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
Controller.doSomething()
return BoxedUnit.UNIT;
}
});
There is also a Function0<BoxedUnit>
, but you don't want to use it - it's not intended to be constructed in Java.
As you can see it's not exactly straightforward to use. If you're using Java 8 though, you can streamline this a bit.
You'd need to define a function like this one:
private static Function0<BoxedUnit> getBoxedUnitFunction0(Runnable f) {
return new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
f.run();
return BoxedUnit.UNIT;
}
};
}
Don't forget to import scala.Function0
- here it's not constructed, so it's OK. Now you can use it like that:
buttonOne.create("Label", getBoxedUnitFunction0(Controller::doSomething));
Using Scala from Java: passing functions as parameters
You have to manually instantiate a Function1
in Java. Something like:
final Function1<Integer, String> f = new Function1<Integer, String>() {
public int $tag() {
return Function1$class.$tag(this);
}
public <A> Function1<A, String> compose(Function1<A, Integer> f) {
return Function1$class.compose(this, f);
}
public String apply(Integer someInt) {
return myFunc(someInt);
}
};
MyScala.setFunc(f);
This is taken from Daniel Spiewak’s “Interop Between Java and Scala” article.
How to call Scala curry functions from Java with Generics
For
private <T> T internalCall(TransactionWithResult<T> data) {
return null;
}
private void internalCall2(TransactionWithoutResult data) {
}
try
public <T> T call(Data<T> data) {
RetryUtil.retry(3, new FiniteDuration(1, TimeUnit.MINUTES), () -> { internalCall2(data); return null; });
return RetryUtil.retry(3, new FiniteDuration(1, TimeUnit.MINUTES), () -> internalCall(data));
}
Parameters from multiple parameter lists in Scala should be seen in Java as parameters of a single parameter list.
Scala and Java functions should be interchangeable (since Scala 2.12)
How to use Java lambdas in Scala (https://stackoverflow.com/a/47381315/5249621)
By-name parameters => T
should be seen as no-arg functions () => T
.
I assumed that Data
implements TransactionWithResult
and TransactionWithoutResult
, so Data
can be used where TransactionWithResult
or TransactionWithoutResult
is expected, otherwise the code should be fixed.
Related Topics
Performance of Pandas Apply VS Np.Vectorize to Create New Column from Existing Columns
Python in Raw Mode Stdin Print Adds Spaces
Create a .CSV File with Values from a Python List
How to Read a File with a Semi Colon Separator in Pandas
Pandas Dataframe Get First Row of Each Group
Typeerror: a Bytes-Like Object Is Required, Not 'Str'
How to Get Current Available Gpus in Tensorflow
What Do Square Brackets, "[]", Mean in Function/Class Documentation
Iterate Over Model Instance Field Names and Values in Template
Paramiko Error When Trying to Edit File: "Sudo: No Tty Present and No Askpass Program Specified"
Move Files from One Directory to Another with Paramiko
Convert String in Base64 to Image and Save on Filesystem
Python: Calling 'List' on a Map Object Twice
Importerror: No Module Named Pil