Saveastextfile() to Write the Final Rdd as Single Text File - Apache Spark

saveAsTextFile() to write the final RDD as single text file - Apache Spark

To write as a single file there are a few options. If your writing to HDFS or a similar distributed store you can first coalesce your RDD down to a single partition (note your data must then fit on a single worker), or you could collect the data to the driver and then use a filewriter.

pysaprk saveAsTextFile write multiple but the data is only all written in 1 single file

Is this normal behaviour?

Yes. It is normal behavior. You example data returns only one meaningful key - "test". One you reduceByKey all the values for this key are shuffled to a single partition. The remaining values are just inconsequential.

The last two transformations

map(lambda (a, b): b if a == "test" else "").flatMap(lambda x: x)\

could be rewritten for clarity as:

filter(map(lambda (a, b): a == "test").values().flatMap(lambda x: x)

In other words your code keeps only values with "test" key and these are already on a single partition.

The part that really doesn't makes sense is grouping by key. You could as well

logs.mapPartitions(lambda x: test()).filter(map(lambda (a, b): a == "test")

which would keep data distributed as a side effect.

Write RDD as textfile using Apache Spark

You can use coalesce method to save into a single file. This way your code will look like this:

val myFile = sc.textFile("file.txt")
val finalRdd = doStuff(myFile)
finalRdd.coalesce(1).saveAsTextFile("newfile")

There is also another method repartition to do the same thing, however it will cause a shuffle which is may be very expensive, while coalesce will try to avoid a shuffle.

how to make saveAsTextFile NOT split output into multiple file?

The reason it saves it as multiple files is because the computation is distributed. If the output is small enough such that you think you can fit it on one machine, then you can end your program with

val arr = year.collect()

And then save the resulting array as a file, Another way would be to use a custom partitioner, partitionBy, and make it so everything goes to one partition though that isn't advisable because you won't get any parallelization.

If you require the file to be saved with saveAsTextFile you can use coalesce(1,true).saveAsTextFile(). This basically means do the computation then coalesce to 1 partition. You can also use repartition(1) which is just a wrapper for coalesce with the shuffle argument set to true. Looking through the source of RDD.scala is how I figured most of this stuff out, you should take a look.

Combined Spark output into single file

Please find below some suggestions:

  • collect() and saveAsTextFile() are actions that means they will collect the results on the driver node. Therefore is redundant to call both of them.

  • In your case you just need to store the data with saveAsTextFile() there is no need to call collect().

  • collect() returns an array of items (in your case you are not using the returned variable)

  • As Glennie and Akash suggested just use coalesce(1) to force one single partition. coalesce(1) will not cause shuffling hence is much more efficient.

  • In the given code you are using the RDD API of Spark I would suggest to use dataframes/datasets instead.

Please refer on the next links for further details over RDDs and dataframes:

Difference between DataFrame, Dataset, and RDD in Spark

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

How to store the text file on the Master?

That is expected behavior. path is resolved on the machine it
is executed, the slaves. I'd recommend to either use a cluster FS
(e.g. HDFS) or .collect() your data so you can save them locally on
the master. Beware of OOM if your data is large.

How can I use a saveAsTextFile() function in Apache Spark?

Your problem seems to be in the map with word(0), not in the saveAsTextFile.

Since you are doing the split in the flatMap, the output of that step is a RDD with each word as a row, there is not an array for each element, just single word.

FlatMap flattens the array.

Check the difference

flatMap

  scala> val rdd =  sc.textFile("/Users/my_name/Workspace/spark-3.0.2-bin-hadoop2.7/README.md").flatMap(line => line.split(' '))


scala> rdd.take(5).foreach(println)
#
Apache
Spark

Spark

map

scala> val rdd =  sc.textFile("/Users/sergio.couto/Workspace/spark-3.0.2-bin-hadoop2.7/README.md").map(line => line.split(' '))

scala> rdd.take(5).foreach(println)
[Ljava.lang.String;@7cd1d235
[Ljava.lang.String;@1736262a
[Ljava.lang.String;@7252e063
[Ljava.lang.String;@5edd0a7e
[Ljava.lang.String;@2c6f2bea

So you don't need to do word(0), it's not an array, you can just do word => (word, word.length)

Full Example:

val rdd =  sc.textFile("/Users/my_name/Workspace/spark-3.0.2-bin-hadoop2.7/README.md")
.flatMap(line => line.split(' '))
.map(word => (word, word.length))
.groupByKey()
.map(pair => (pair._1, pair._2.sum/pair._2.size.toDouble))


scala> rdd.take(10).foreach(println)
(package,7.0)
(this,4.0)
(integration,11.0)
(Python,6.0)
(cluster.,8.0)
(its,3.0)
([run,4.0)
(There,5.0)
(general,7.0)
(YARN,,5.0)

Then you will be able to perform saveAsTextFile or whichever method you need



Related Topics



Leave a reply



Submit