Calling Java/Scala Function from a Task

Calling Java/Scala function from a task

Communication using default Py4J gateway is simply not possible. To understand why we have to take a look at the following diagram from the PySpark Internals document [1]:

Sample Image

Since Py4J gateway runs on the driver it is not accessible to Python interpreters which communicate with JVM workers through sockets (See for example PythonRDD / rdd.py).

Theoretically it could be possible to create a separate Py4J gateway for each worker but in practice it is unlikely to be useful. Ignoring issues like reliability Py4J is simply not designed to perform data intensive tasks.

Are there any workarounds?

  1. Using Spark SQL Data Sources API to wrap JVM code.

    Pros: Supported, high level, doesn't require access to the internal PySpark API

    Cons: Relatively verbose and not very well documented, limited mostly to the input data

  2. Operating on DataFrames using Scala UDFs.

    Pros: Easy to implement (see Spark: How to map Python with Scala or Java User Defined Functions?), no data conversion between Python and Scala if data is already stored in a DataFrame, minimal access to Py4J

    Cons: Requires access to Py4J gateway and internal methods, limited to Spark SQL, hard to debug, not supported

  3. Creating high level Scala interface in a similar way how it is done in MLlib.

    Pros: Flexible, ability to execute arbitrary complex code. It can be don either directly on RDD (see for example MLlib model wrappers) or with DataFrames (see How to use a Scala class inside Pyspark). The latter solution seems to be much more friendly since all ser-de details are already handled by existing API.

    Cons: Low level, required data conversion, same as UDFs requires access to Py4J and internal API, not supported

    Some basic examples can be found in Transforming PySpark RDD with Scala

  4. Using external workflow management tool to switch between Python and Scala / Java jobs and passing data to a DFS.

    Pros: Easy to implement, minimal changes to the code itself

    Cons: Cost of reading / writing data (Alluxio?)

  5. Using shared SQLContext (see for example Apache Zeppelin or Livy) to pass data between guest languages using registered temporary tables.

    Pros: Well suited for interactive analysis

    Cons: Not so much for batch jobs (Zeppelin) or may require additional orchestration (Livy)


  1. Joshua Rosen. (2014, August 04) PySpark Internals. Retrieved from https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

Call function from Scala in Java

Since process in your example is a function of type String => String, To invoke a function, you need to call apply() on it.

Scala by default invokes apply when client code uses parenthsis (), but java won't recognise that.

object Path {
def process: String => String = (input: String) => "processed"
}

scala> process("some value")
res88: String = processed

scala> process.apply("some value")
res89: String = processed

NOTE: The expanded version of scala function is,

  def process = new Function[String, String] {
override def apply(input: String) = "processed"
}

invoking from java,

public class Test {

public static void main(String[] args) {
Path f = new Path();
System.out.println(f.process().apply("som input")); //prints processed
}
}

Calling Java method that receive variable amount of parameters from Scala

Try

def info(message: String, any: Any*): Unit = {
LOGGER.info(message, any.asInstanceOf[Seq[Object]]: _*)
}

or

def info(message: String, any: AnyRef*): Unit = {
LOGGER.info(message, any: _*)
}

without casting but not applicable to primitive types.

Pass Java-method-call to Scala class/method

It does not compile since you're not passing the method to ScalaButton.create, you're passing the result of the method call, which is void.

If you want to pass a function to Scala from Java, you need to construct an instance of - in this case - AbstractFunction0<BoxedUnit>, which corresponds to () => Unit.

To do this, you need to:

import scala.AbstractFunction0
import scala.runtime.BoxedUnit

and then:

buttonOne.create("Label", new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
Controller.doSomething()
return BoxedUnit.UNIT;
}
});

There is also a Function0<BoxedUnit>, but you don't want to use it - it's not intended to be constructed in Java.

As you can see it's not exactly straightforward to use. If you're using Java 8 though, you can streamline this a bit.

You'd need to define a function like this one:

private static Function0<BoxedUnit> getBoxedUnitFunction0(Runnable f) {
return new AbstractFunction0<BoxedUnit>() {

@Override
public BoxedUnit apply() {
f.run();
return BoxedUnit.UNIT;
}
};
}

Don't forget to import scala.Function0 - here it's not constructed, so it's OK. Now you can use it like that:

buttonOne.create("Label", getBoxedUnitFunction0(Controller::doSomething));

Using Scala from Java: passing functions as parameters

You have to manually instantiate a Function1 in Java. Something like:

final Function1<Integer, String> f = new Function1<Integer, String>() {
public int $tag() {
return Function1$class.$tag(this);
}

public <A> Function1<A, String> compose(Function1<A, Integer> f) {
return Function1$class.compose(this, f);
}

public String apply(Integer someInt) {
return myFunc(someInt);
}
};
MyScala.setFunc(f);

This is taken from Daniel Spiewak’s “Interop Between Java and Scala” article.

How to call Scala curry functions from Java with Generics

For

private <T> T internalCall(TransactionWithResult<T> data) {
return null;
}

private void internalCall2(TransactionWithoutResult data) {
}

try

public <T> T call(Data<T> data) {
RetryUtil.retry(3, new FiniteDuration(1, TimeUnit.MINUTES), () -> { internalCall2(data); return null; });

return RetryUtil.retry(3, new FiniteDuration(1, TimeUnit.MINUTES), () -> internalCall(data));
}

Parameters from multiple parameter lists in Scala should be seen in Java as parameters of a single parameter list.

Scala and Java functions should be interchangeable (since Scala 2.12)

How to use Java lambdas in Scala (https://stackoverflow.com/a/47381315/5249621)

By-name parameters => T should be seen as no-arg functions () => T.

I assumed that Data implements TransactionWithResult and TransactionWithoutResult, so Data can be used where TransactionWithResult or TransactionWithoutResult is expected, otherwise the code should be fixed.



Related Topics



Leave a reply



Submit