Apache Spark - Foreach VS Foreachpartition When to Use What

How to use foreachPartition in Spark?

I think you have the wrong impression of what BoxedUnit is and therefore insist on using the Scala interface in Java, which is overly complicated due to the amount of hidden complexity in Scala that gets exposed to Java. scala.Function1<scala.collection.Iterator<T>, scala.runtime.BoxedUnit> is the implementation of (Iterator[T]) => Unit - a Scala function that takes an Iterator[T] and returns the Unit type. Unit in Scala is the equivalent of Java's void. BoxedUnit is the boxed version of Unit - it is a heap object holding the singleton unit value in its UNIT member and is an implementation detail that almost never surfaces in Scala programs. If the dataset is a DataFrame, then T will be org.apache.spark.sql.Row and you need to process Scala iterators over collections of Row objects.

To define something that is scala.Function1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit> in Java, you need to create an instance of AbstractFunction1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit> and override its apply() method where you must return BoxedUnit.UNIT. You also need to make it serializable, so you usually declare your own class that inherits from AbstractFunction1 and implements Serializable. You may also Java-fy it by exposing a different, more Java-friendly abstract method to be overridden later:

import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.collection.JavaConverters;
import java.util.Iterator;

class MyPartitionFunction<T> extends AbstractFunction1<scala.collection.Iterator<T>, BoxedUnit>
implements Serializable {
@Override
public BoxedUnit apply(scala.collection.Iterator<T> iterator) {
call(JavaConverters.asJavaIteratorConverter(iterator).asJava());
return BoxedUnit.UNIT;
}

public abstract void call(Iterator<T> iterator);
}

df.foreachPartition(new MyPartitionFunction<Row>() {
@Override
public void call(Iterator<Row> iterator) {
for (Row row : iterator) {
// do something with the row
}
}
});

This is a fair amount of implementation complexity, which is why there is the Java-specific version that takes ForeachPartitionFunction<T> instead and the above code becomes:

import org.apache.spark.sql.Row;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import java.util.Iterator;

df.foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> iterator) throws Exception {
for (Row row : iterator) {
// do something with the row
}
}
}

The functionality is exactly the same as the one provided by the Scala interface, just Apache Spark does the iterator conversion for you and it also provides you with a friendly Java class that doesn't require you to import and implement Scala types.

That said, I think you have a bit of misunderstanding of how Spark works. You do not need to use foreachPartition to process streaming data in batches. That is done automatically for you by the streaming engine of Spark. You write streaming queries that specify transformations and aggregations that then get applied progressively as new data arrives from the stream.

foreachPartition is a form of foreach reserved for some special batch processing cases, for example, when you need to do some expensive object instantiations in the processing function and doing it for each row incurs huge overhead. With foreachPartition your processing function gets called only once per partition, so you can instantiate the expensive objects once and then iterate over the partition's data. This decreases processing time because you do the expensive stuff just once.

But then, you cannot even call foreach() or foreachPartition() on a streaming source as that results in an AnalysisException. Instead, you have to use the foreach() or foreachBatch() methods of DataStreamWriter. DataStreamWriter.foreach() takes an instance of ForeachWriter while DataStreamWriter.foreachBatch() takes a void function that receives a dataset and the batch ID. ForeachWriter receives an epoch ID in its open() method. Again, foreachBatch() comes in both Scala and Java flavours that are equivalent in functionality, so please use the Java-specific one if you are going to write in Java.

Apache Spark : When not to use mapPartition and foreachPartition?

When you write Spark jobs that uses either mapPartition or foreachPartition you can just modify the partition data itself or just iterate through partition data respectively. The anonymous function passed as parameter will be executed on the executors thus there is not a viable way to execute a code which invokes all the nodes e.g: df.reduceByKey from one particular executor. This code should be executed only from the driver node. Thus only from the driver code you can access dataframes, datasets and spark session.

Please find here a detailed discussion over this issue and possible solutions

What is the difference between forEachAsync vs forEachPartitionAsync in Apache Spark?

I believe you are already aware of the fact of Async, and asking for the difference between forEach and forEachPartition,

And the difference is, ForEachPartition will allow you to run per partition custom code which you can't do with ForEach.

For Example, You want to save your result to database. Now as you know that opening closing DB connections are costly, one connection(or pool) per executor will be best. So you code would be

rdd.forEachPartition(part => {
db= mysql..blablabla
part.forEach(record=> {
db.save(record)
})
db.close()
})

You can't do this in ForEach, in foreach it will iterate for each record.

Remember, One partition will always run on one executor. So if you have any costly pre-work to do before start processing the data use forEachParition. If not just use forEach. Both are parallel. One gives you flexibility other gives simplicity.

Does sparks foreachPartition run on the driver or on the worker?

Same as foreach() foreachPartition() is executed on workers. There is no reason to transfer data to driver to process it.

rdd.foreachPartition { rddpartition =>
val thinUrl = "some jdbc url"
val conn = DriverManager.getConnection(thinUrl)
rddpartition.foreach { record =>
conn.createStatement().execute("some statement" )
}
conn.commit()
}

How to use foreachPartition in Spark 2.2 to avoid Task Serialization error

Write implementation of ForeachWriter and than use it. (Avoid anonymous classes with not serializable objects - in your case its ProducerRecord)

Example: val writer = new YourForeachWriter[String]
Also here is helpful article about Spark Serialization problems: https://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error



Related Topics



Leave a reply



Submit