How to use foreachPartition in Spark?
I think you have the wrong impression of what BoxedUnit
is and therefore insist on using the Scala interface in Java, which is overly complicated due to the amount of hidden complexity in Scala that gets exposed to Java. scala.Function1<scala.collection.Iterator<T>, scala.runtime.BoxedUnit>
is the implementation of (Iterator[T]) => Unit
- a Scala function that takes an Iterator[T]
and returns the Unit
type. Unit
in Scala is the equivalent of Java's void
. BoxedUnit
is the boxed version of Unit
- it is a heap object holding the singleton unit value in its UNIT
member and is an implementation detail that almost never surfaces in Scala programs. If the dataset is a DataFrame
, then T
will be org.apache.spark.sql.Row
and you need to process Scala iterators over collections of Row
objects.
To define something that is scala.Function1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit>
in Java, you need to create an instance of AbstractFunction1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit>
and override its apply()
method where you must return BoxedUnit.UNIT
. You also need to make it serializable, so you usually declare your own class that inherits from AbstractFunction1
and implements Serializable
. You may also Java-fy it by exposing a different, more Java-friendly abstract method to be overridden later:
import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.collection.JavaConverters;
import java.util.Iterator;
class MyPartitionFunction<T> extends AbstractFunction1<scala.collection.Iterator<T>, BoxedUnit>
implements Serializable {
@Override
public BoxedUnit apply(scala.collection.Iterator<T> iterator) {
call(JavaConverters.asJavaIteratorConverter(iterator).asJava());
return BoxedUnit.UNIT;
}
public abstract void call(Iterator<T> iterator);
}
df.foreachPartition(new MyPartitionFunction<Row>() {
@Override
public void call(Iterator<Row> iterator) {
for (Row row : iterator) {
// do something with the row
}
}
});
This is a fair amount of implementation complexity, which is why there is the Java-specific version that takes ForeachPartitionFunction<T>
instead and the above code becomes:
import org.apache.spark.sql.Row;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import java.util.Iterator;
df.foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> iterator) throws Exception {
for (Row row : iterator) {
// do something with the row
}
}
}
The functionality is exactly the same as the one provided by the Scala interface, just Apache Spark does the iterator conversion for you and it also provides you with a friendly Java class that doesn't require you to import and implement Scala types.
That said, I think you have a bit of misunderstanding of how Spark works. You do not need to use foreachPartition
to process streaming data in batches. That is done automatically for you by the streaming engine of Spark. You write streaming queries that specify transformations and aggregations that then get applied progressively as new data arrives from the stream.
foreachPartition
is a form of foreach
reserved for some special batch processing cases, for example, when you need to do some expensive object instantiations in the processing function and doing it for each row incurs huge overhead. With foreachPartition
your processing function gets called only once per partition, so you can instantiate the expensive objects once and then iterate over the partition's data. This decreases processing time because you do the expensive stuff just once.
But then, you cannot even call foreach()
or foreachPartition()
on a streaming source as that results in an AnalysisException
. Instead, you have to use the foreach()
or foreachBatch()
methods of DataStreamWriter
. DataStreamWriter.foreach()
takes an instance of ForeachWriter
while DataStreamWriter.foreachBatch()
takes a void function that receives a dataset and the batch ID. ForeachWriter
receives an epoch ID in its open()
method. Again, foreachBatch()
comes in both Scala and Java flavours that are equivalent in functionality, so please use the Java-specific one if you are going to write in Java.
Apache Spark : When not to use mapPartition and foreachPartition?
When you write Spark jobs that uses either mapPartition or foreachPartition you can just modify the partition data itself or just iterate through partition data respectively. The anonymous function passed as parameter will be executed on the executors thus there is not a viable way to execute a code which invokes all the nodes e.g: df.reduceByKey from one particular executor. This code should be executed only from the driver node. Thus only from the driver code you can access dataframes, datasets and spark session.
Please find here a detailed discussion over this issue and possible solutions
What is the difference between forEachAsync vs forEachPartitionAsync in Apache Spark?
I believe you are already aware of the fact of Async, and asking for the difference between forEach and forEachPartition,
And the difference is, ForEachPartition will allow you to run per partition custom code which you can't do with ForEach.
For Example, You want to save your result to database. Now as you know that opening closing DB connections are costly, one connection(or pool) per executor will be best. So you code would be
rdd.forEachPartition(part => {
db= mysql..blablabla
part.forEach(record=> {
db.save(record)
})
db.close()
})
You can't do this in ForEach, in foreach it will iterate for each record.
Remember, One partition will always run on one executor. So if you have any costly pre-work to do before start processing the data use forEachParition. If not just use forEach. Both are parallel. One gives you flexibility other gives simplicity.
Does sparks foreachPartition run on the driver or on the worker?
Same as foreach()
foreachPartition()
is executed on workers. There is no reason to transfer data to driver to process it.
rdd.foreachPartition { rddpartition =>
val thinUrl = "some jdbc url"
val conn = DriverManager.getConnection(thinUrl)
rddpartition.foreach { record =>
conn.createStatement().execute("some statement" )
}
conn.commit()
}
How to use foreachPartition in Spark 2.2 to avoid Task Serialization error
Write implementation of ForeachWriter and than use it. (Avoid anonymous classes with not serializable objects - in your case its ProducerRecord)
Example: val writer = new YourForeachWriter[String]
Also here is helpful article about Spark Serialization problems: https://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error
Related Topics
Java Type Generic as Argument for Gson
Jackson Dynamic Property Names
Java Bufferedimage Getting Red, Green and Blue Individually
How to Show Changes Between Commits with Jgit
How to Iterate Over a Priorityqueue
Override "Private" Method in Java
How to Take Screenshots Fast in Java
How to Create a Stream of Regex Matches
Main Method Not Found Even If I'Ve Declared It
Remove Duplicates from Arraylists
How to Use Argumentcaptor for Stubbing
Apache Httpclient 4.0.3 - How to Set Cookie with Sessionid for Post Request
Create Custom Annotation for Lombok
Java_Home Should Point to a Jdk Not a Jre
How to Check the Type of a Value from a JSONobject
Error:Java.Lang.Nosuchmethoderror: Org.Objectweb.Asm.Classwriter.<Init>(I)V