Read Whole Text Files from a Compression in Spark

Read whole text files from a compression in Spark

One possible solution is to read data with binaryFiles and extract content manually.


import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(
// Read until next exntry is null
// flatten
.flatMap(x => x)
// Drop directories
.map(e => {
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i =, 0, n)
(i, buffer.take(i))}
// Take as long as we've read something
.takeWhile(_._1 > 0)

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) =
new String(bytes, StandardCharsets.UTF_8)

sc.binaryFiles("somePath").flatMapValues(x =>
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"

Full usage example with Java:


import tarfile
from io import BytesIO

def extractFiles(bytes):
tar =, mode="r:gz")
return [tar.extractfile(x).read() for x in tar if x.isfile()]

.mapValues(lambda xs: [x.decode("utf-8") for x in xs]))

Does spark compress on the fly when reading data?

No, it doesn't compress the raw files.

How to read .gz compressed file using spark DF or DS?

Reading a compressed csv is done in the same way as reading an uncompressed csv file. For Spark version 2.0+ it can be done as follows using Scala (note the extra option for the tab delimiter):

val df ="sep", "\t").csv("file.csv.gz")


df ="file.csv.gz", sep='\t')

The only extra consideration to take into account is that the gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down. After the read is done the data can be shuffled to increase parallelism.

Read a compressed file *with custom extension* with spark

Here there's a workaround to fix this problem

The relevant section:

...extend GzipCodec and override the getDefaultExtension method.

package smx.ananke.spark.util.codecs


class TmpGzipCodec extends GzipCodec {

override def getDefaultExtension(): String = ".gz.tmp" // You should change it to ".Z"


Now we just registered this codec, setting on SparkConf:

val conf = new SparkConf()

// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("", "smx.ananke.spark.util.codecs.TmpGzipCodec")

val sc = new SparkContext(conf)

val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")

How to read multiple text files into a single RDD?

You can specify whole directories, use wildcards and even CSV of directories and wildcards. E.g.:


As Nick Chammas points out this is an exposure of Hadoop's FileInputFormat and therefore this also works with Hadoop (and Scalding).

Related Topics

Leave a reply
