saveAsTextFile() to write the final RDD as single text file - Apache Spark
To write as a single file there are a few options. If your writing to HDFS or a similar distributed store you can first coalesce
your RDD down to a single partition (note your data must then fit on a single worker), or you could collect
the data to the driver and then use a filewriter.
pysaprk saveAsTextFile write multiple but the data is only all written in 1 single file
Is this normal behaviour?
Yes. It is normal behavior. You example data returns only one meaningful key - "test"
. One you reduceByKey
all the values for this key are shuffled to a single partition. The remaining values are just inconsequential.
The last two transformations
map(lambda (a, b): b if a == "test" else "").flatMap(lambda x: x)\
could be rewritten for clarity as:
filter(map(lambda (a, b): a == "test").values().flatMap(lambda x: x)
In other words your code keeps only values with "test"
key and these are already on a single partition.
The part that really doesn't makes sense is grouping by key. You could as well
logs.mapPartitions(lambda x: test()).filter(map(lambda (a, b): a == "test")
which would keep data distributed as a side effect.
Write RDD as textfile using Apache Spark
You can use coalesce
method to save into a single file. This way your code will look like this:
val myFile = sc.textFile("file.txt")
val finalRdd = doStuff(myFile)
finalRdd.coalesce(1).saveAsTextFile("newfile")
There is also another method repartition
to do the same thing, however it will cause a shuffle which is may be very expensive, while coalesce will try to avoid a shuffle.
how to make saveAsTextFile NOT split output into multiple file?
The reason it saves it as multiple files is because the computation is distributed. If the output is small enough such that you think you can fit it on one machine, then you can end your program with
val arr = year.collect()
And then save the resulting array as a file, Another way would be to use a custom partitioner, partitionBy
, and make it so everything goes to one partition though that isn't advisable because you won't get any parallelization.
If you require the file to be saved with saveAsTextFile
you can use coalesce(1,true).saveAsTextFile()
. This basically means do the computation then coalesce to 1 partition. You can also use repartition(1)
which is just a wrapper for coalesce
with the shuffle argument set to true. Looking through the source of RDD.scala is how I figured most of this stuff out, you should take a look.
Combined Spark output into single file
Please find below some suggestions:
collect()
andsaveAsTextFile()
are actions that means they will collect the results on the driver node. Therefore is redundant to call both of them.In your case you just need to store the data with
saveAsTextFile()
there is no need to callcollect()
.collect()
returns an array of items (in your case you are not using the returned variable)As Glennie and Akash suggested just use
coalesce(1)
to force one single partition.coalesce(1)
will not cause shuffling hence is much more efficient.In the given code you are using the RDD API of Spark I would suggest to use dataframes/datasets instead.
Please refer on the next links for further details over RDDs and dataframes:
Difference between DataFrame, Dataset, and RDD in Spark
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
How to store the text file on the Master?
That is expected behavior. path
is resolved on the machine it
is executed, the slaves. I'd recommend to either use a cluster FS
(e.g. HDFS) or .collect()
your data so you can save them locally on
the master. Beware of OOM if your data is large.
How can I use a saveAsTextFile() function in Apache Spark?
Your problem seems to be in the map with word(0)
, not in the saveAsTextFile.
Since you are doing the split in the flatMap
, the output of that step is a RDD with each word as a row, there is not an array for each element, just single word.
FlatMap flattens the array.
Check the difference
flatMap
scala> val rdd = sc.textFile("/Users/my_name/Workspace/spark-3.0.2-bin-hadoop2.7/README.md").flatMap(line => line.split(' '))
scala> rdd.take(5).foreach(println)
#
Apache
Spark
Spark
map
scala> val rdd = sc.textFile("/Users/sergio.couto/Workspace/spark-3.0.2-bin-hadoop2.7/README.md").map(line => line.split(' '))
scala> rdd.take(5).foreach(println)
[Ljava.lang.String;@7cd1d235
[Ljava.lang.String;@1736262a
[Ljava.lang.String;@7252e063
[Ljava.lang.String;@5edd0a7e
[Ljava.lang.String;@2c6f2bea
So you don't need to do word(0), it's not an array, you can just do word => (word, word.length)
Full Example:
val rdd = sc.textFile("/Users/my_name/Workspace/spark-3.0.2-bin-hadoop2.7/README.md")
.flatMap(line => line.split(' '))
.map(word => (word, word.length))
.groupByKey()
.map(pair => (pair._1, pair._2.sum/pair._2.size.toDouble))
scala> rdd.take(10).foreach(println)
(package,7.0)
(this,4.0)
(integration,11.0)
(Python,6.0)
(cluster.,8.0)
(its,3.0)
([run,4.0)
(There,5.0)
(general,7.0)
(YARN,,5.0)
Then you will be able to perform saveAsTextFile
or whichever method you need
Related Topics
Javac: File Not Found: First.Java Usage: Javac <Options> <Source Files>
Refreshing Existing Windows in Java Swing
Java - Reading Text File as an Integer and Split Each Line into Array
Find Duplicate Objects Based on Property in Java
Gradle Java9 Could Not Target Platform: 'Java Se 9' Using Tool Chain: 'Jdk 8 (1.8)'
Spring Boot Not Recognizing Application.Properties File
What Is a Raw Type and Why Shouldn't We Use It
How to Initialize a Byte Array in Java
Java Simpledateformatter With 10 Decimals After the Seconds, Cannot Convert to Date
Multiple Queries Executed in Java in Single Statement
Deserialization With @Jsonsubtypes for No Value - Missing Property Error
How to Fetch All Links and Click Those Links One by One Using Selenium Webdriver
How to Pass Json Object as a Pathvariable to Spring Controller
How to Store a Large (10 Digits) Integer