java.lang.ClassCastException using lambda expressions in spark job on remote server
What you have here, is a follow-up error which masks the original error.
When lambda instances are serialized, they use writeReplace
to dissolve their JRE specific
implementation from the persistent form which is a SerializedLambda
instance. When the SerializedLambda
instance has been restored, its readResolve
method will be invoked
to reconstitute the appropriate lambda instance. As the documentation says, it will do so by invoking a special method of the class which defined the original lambda (see also this answer). The important point is that the original class is needed and that’s what’s missing in your case.
But there’s a …special… behavior of the ObjectInputStream
. When it encounters an exception, it doesn’t bail out immediately. It will record the exception and continue the process, marking all object being currently read, thus depending on the erroneous object as being erroneous as well. Only at the end of the process it will throw the original exception it encountered. What makes it so strange is that it will also continue trying to set the fields of these object. But when you look at the method ObjectInputStream.readOrdinaryObject
line 1806:
…
if (obj != null &&
handles.lookupException(passHandle) == null &&
desc.hasReadResolveMethod())
{
Object rep = desc.invokeReadResolve(obj);
if (unshared && rep.getClass().isArray()) {
rep = cloneArray(rep);
}
if (rep != obj) {
handles.setObject(passHandle, obj = rep);
}
}
return obj;
}
you see that it doesn’t call the readResolve
method when lookupException
reports a non-null
exception. But when the substitution did not happen, it’s not a good idea to continue trying to set the field values of the referrer but that’s exactly what’s happens here, hence producing a ClassCastException
.
You can easily reproduce the problem:
public class Holder implements Serializable {
Runnable r;
}
public class Defining {
public static Holder get() {
final Holder holder = new Holder();
holder.r=(Runnable&Serializable)()->{};
return holder;
}
}
public class Writing {
static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
public static void main(String... arg) throws IOException {
try(FileOutputStream os=new FileOutputStream(f);
ObjectOutputStream oos=new ObjectOutputStream(os)) {
oos.writeObject(Defining.get());
}
System.out.println("written to "+f);
}
}
public class Reading {
static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
public static void main(String... arg) throws IOException, ClassNotFoundException {
try(FileInputStream is=new FileInputStream(f);
ObjectInputStream ois=new ObjectInputStream(is)) {
Holder h=(Holder)ois.readObject();
System.out.println(h.r);
h.r.run();
}
System.out.println("read from "+f);
}
}
Compile these four classes and run Writing
. Then delete the class file Defining.class
and run Reading
. Then you will get a
Exception in thread "main" java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field test.Holder.r of type java.lang.Runnable in instance of test.Holder
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
(Tested with 1.8.0_20)
The bottom line is that you may forget about this Serialization issue once it is understood what’s happening, all you have to do for solving your problem is to make sure that the class which defined the lambda expression is also available in the runtime where the lambda is deserialized.
Example for Spark Job to run directly from IDE (spark-submit distributes jar by default):
SparkConf sconf = new SparkConf()
.set("spark.eventLog.dir", "hdfs://nn:8020/user/spark/applicationHistory")
.set("spark.eventLog.enabled", "true")
.setJars(new String[]{"/path/to/jar/with/your/class.jar"})
.setMaster("spark://spark.standalone.uri:7077");
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD
Above program works perfectly.
The issue was in building the jar. So don't doubt the program just focus on whether jar is getting built properly or not.
In my case, I am using Intellij. I was doing build artifact from build option and I think due to it jar was not getting built properly as it is maven project.
So, when I did maven build jar got built properly and program ran smoothly.
Related Topics
How to Create a Standalone .Exe in Java (That Runs Without an Installer and a Jre)
Why Does (360/24)/60 = 0 ... in Java
Run Main Class of Maven Project
How to Limit Setaccessible to Only "Legitimate" Uses
Optional Orelse Optional in Java
Should I Call Ugi.Checktgtandreloginfromkeytab() Before Every Action on Hadoop
Removing Invalid Xml Characters from a String in Java
Replace All Occurrences of a String Using Stringbuilder
Compiler Error When Declaring a Variable Inside If Condition and No Curly Braces
How to Set Icon to a Jlabel from an Image from a Folder
Checking If a String Is Empty or Null in Java
Webdriver: Check If an Element Exists
How to Get the Path of Running Java Program
Java: Random Long Number in 0 <= X < N Range
Dealing with an Arraystoreexception
Issues with Swingworker and Jprogressbar
Mockito:How to Verify Method Was Called on an Object Created Within a Method