Spark Union of Multiple Rdds

Spark union of multiple RDDs

If these are RDDs you can use SparkContext.union method:

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = sc.parallelize([7, 8, 9])

rdd = sc.union([rdd1, rdd2, rdd3])
rdd.collect()

## [1, 2, 3, 4, 5, 6, 7, 8, 9]

There is no DataFrame equivalent but it is just a matter of a simple one-liner:

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)

df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v"))
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v"))
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))

unionAll(df1, df2, df3).show()

## +---+----+
## | k| v|
## +---+----+
## | 1|foo1|
## | 2|bar1|
## | 3|foo2|
## | 4|bar2|
## | 5|foo3|
## | 6|bar3|
## +---+----+

If number of DataFrames is large using SparkContext.union on RDDs and recreating DataFrame may be a better choice to avoid issues related to the cost of preparing an execution plan:

def unionAll(*dfs):
first, *_ = dfs # Python 3.x, for 2.x you'll have to unpack manually
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]),
first.schema
)

How to zip multiple RDDs in pyspark?

Good question. The introduction of zipPartitions in PySpark was proposed in 2016, but as you can read among comments, they never managed to find a good compromise between performances and solution complexity. The issue is now closed but I do not think it will be reopened in the near future. This is the solution proposed by Joseph E. Gonzalez.


The quickest way to use that API is to write it yourself (performances will not be that good of course). A very naive zipPartitions implementation is:

def zipPartitions(rdd1, rdd2, func):
rdd1_numPartitions = rdd1.getNumPartitions()
rdd2_numPartitions = rdd2.getNumPartitions()
assert rdd1_numPartitions == rdd2_numPartitions, "rdd1 and rdd2 must have the same number of partitions"

paired_rdd1 = rdd1.mapPartitionsWithIndex(lambda index, it: ((index, list(it)),))
paired_rdd2 = rdd2.mapPartitionsWithIndex(lambda index, it: ((index, list(it)),))

zipped_rdds = paired_rdd1.join(paired_rdd2, numPartitions=rdd1_numPartitions)\
.flatMap(lambda x: func(x[1][0], x[1][1]))

return zipped_rdds

You can test it with:

rdd1 = sc.parallelize(range(30), 3)
rdd2 = sc.parallelize(range(50), 3)

zipPartitions(rdd1, rdd2, lambda it1, it2: itertools.zip_longest(it1, it2))\
.glom().collect()

Arguments are easy to understand, they are, in order, the first rdd, the second rdd and a function accepting 2 partition iterators, one for each rdd.
With the assert rdd1_numPartitions == rdd2_numPartitions I make sure both rdd have the same number of partitions, that is a precondition for the Scala version too.
Then I use mapPartitionsWithIndex on both rdds to transform, for example, an rdd with two partitions, from:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

To

[(0, [0, 1, 2, 3, 4]), (1, [5, 6, 7, 8, 9])]

Note: the conversion from it to list(it) is unfortunately necessary because in most of the python implementations you cannot pickle generators, and the it parameter is a generator. There is an exception that allows you to convert it into a list, a case that pyspark handles with a very clever optimization, I am talking about an rdd created from a range(). In fact, considering the previous example,

range(10)

becomes

[(0, range(0, 5)), (1, range(5, 10))]

Next I can join the two new rdds on the partition index. numPartitions can be easily predicted because we have previously asserted that both rdd must have the same number of partitions, so they are in a 1-to-1 relationship. Finally, I can apply the passed function and flatten the list of partition results.

Apache Spark - Intersection of Multiple RDDs

Try:

val rdds = Seq(
sc.parallelize(Seq(1, 3, 5)),
sc.parallelize(Seq(3, 5)),
sc.parallelize(Seq(1, 3))
)
rdds.map(rdd => rdd.map(x => (x, None))).reduce((x, y) => x.join(y).keys.map(x => (x, None))).keys

Efficient union of multiple small RDDs

Why it's slow

sc.textFile is not optimized for this case. Remember that optimal partition size is on the order of 100 MB, and right now, your sc.union RDD is getting one partition per file -- <8k. Spark overhead is going to absolutely dominate anything you do in this paradigm.

You mentioned "increasing the partitions" in your question, but I think here you probably want to reduce the number of partitions. I'm not sure where numPartitions came from, but this should be roughly total data size / 100MB. Your .partitionBy step is performing a full shuffle, and so there will still be lots of overhead from the original too-many-partitions RDD, but it will likely perform better downstream.

Another execution model to try

Here's something else to try: a no-shuffle coalesce on the union:

val optimalNPartitions = ??? // calculate total size / 100MB here
val totalWordCount = sc.union(files.map(path => sc.textFile(path)))
.flatMap(line => line.split(" "))
.coalesce(optimalNPartitions, shuffle = false) // try with shuf = true as well!
.map((_,1))
.reduceByKey(_ + _)

One final note

While you say you're partitioning to a new hash partitioner to make reduceByKey more efficient, this is actually wrong.

Let's look at the two models. First, the one you had: partitionBy followed by reduceByKey. The partition step will do a full shuffle against a new hash partitioner -- all the data needs to move across the network. When you call reduce, all the like keys are already in the same place so no shuffle needs to happen.

Second, leave out partitionBy and just call reduceByKey. In this model, you come into the reduce with no partitioner, so you have to shuffle. But before you shuffle each key, you're going to reduce locally -- if you had the word "dog" 100 times on one partition, you're going to shuffle ("dog", 100) instead of ("dog", 1) 100 times. See where I'm going with this? Reduce actually requires only a partial shuffle, whose size is determined by the sparsity of keys (if you only have a few unique keys, very little is shuffled. If everything is unique, everything is shuffled).

Clearly model 2 is what we want. Get rid of that partitionBy!

Union RDDs after a loop PySpark

The best way to do this :

def get_population_id(repartionned_rdd):

idx = range(len(repartionned_rdd))

FullRDD = sc.emptyRDD()

for (i, rdd) in zip(idx, repartionned_rdd):

FullRDD = FullRDD.union(rdd.map(lambda x: x + (float(i),)))

return FullRDD

Merge multiple RDD generated in loop

Instead of using vars, you can use functional programming paradigms to achieve what you want :

val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _)

Also, if you still need to create an empty RDD, you can do it using :

val empty = sc.emptyRDD[(long, long, String)]

How to perform union on two DataFrames with different amounts of columns in Spark?

In Scala you just have to append all missing columns as nulls.

import org.apache.spark.sql.functions._

// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
(50, 2),
(34, 4)
)).toDF("age", "children")

val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}

df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50| 2| null| null|
| 34| 4| null| null|
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+

Update

Both temporal DataFrames will have the same order of columns, because we are mapping through total in both cases.

df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()

+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50| 2| null| null|
| 34| 4| null| null|
+---+--------+---------+------+

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+


Related Topics



Leave a reply



Submit