Read Whole Text Files from a Compression in Spark

Read whole text files from a compression in Spark

One possible solution is to read data with binaryFiles and extract content manually.

Scala:

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
Stream.continually(Option(tar.getNextTarEntry))
// Read until next exntry is null
.takeWhile(_.isDefined)
// flatten
.flatMap(x => x)
// Drop directories
.filter(!_.isDirectory)
.map(e => {
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i = tar.read(buffer, 0, n)
(i, buffer.take(i))}
// Take as long as we've read something
.takeWhile(_._1 > 0)
.map(_._2)
.flatten
.toArray})
.toArray
}

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) =
new String(bytes, StandardCharsets.UTF_8)

sc.binaryFiles("somePath").flatMapValues(x =>
extractFiles(x).toOption).mapValues(_.map(decode()))
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"

Full usage example with Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Python:

import tarfile
from io import BytesIO

def extractFiles(bytes):
tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
return [tar.extractfile(x).read() for x in tar if x.isfile()]

(sc.binaryFiles("somePath")
.mapValues(extractFiles)
.mapValues(lambda xs: [x.decode("utf-8") for x in xs]))

Does spark compress on the fly when reading data?

No, it doesn't compress the raw files.

How to read .gz compressed file using spark DF or DS?

Reading a compressed csv is done in the same way as reading an uncompressed csv file. For Spark version 2.0+ it can be done as follows using Scala (note the extra option for the tab delimiter):

val df = spark.read.option("sep", "\t").csv("file.csv.gz")

PySpark:

df = spark.read.csv("file.csv.gz", sep='\t')

The only extra consideration to take into account is that the gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down. After the read is done the data can be shuffled to increase parallelism.

Read a compressed file *with custom extension* with spark

Here there's a workaround to fix this problem http://arjon.es/2015/10/02/reading-compressed-data-with-spark-using-unknown-file-extensions/

The relevant section:

...extend GzipCodec and override the getDefaultExtension method.

package smx.ananke.spark.util.codecs

import org.apache.hadoop.io.compress.GzipCodec

class TmpGzipCodec extends GzipCodec {

override def getDefaultExtension(): String = ".gz.tmp" // You should change it to ".Z"

}

Now we just registered this codec, setting
spark.hadoop.io.compression.codecs on SparkConf:

val conf = new SparkConf()

// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")

val sc = new SparkContext(conf)

val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")

How to read multiple text files into a single RDD?

You can specify whole directories, use wildcards and even CSV of directories and wildcards. E.g.:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

As Nick Chammas points out this is an exposure of Hadoop's FileInputFormat and therefore this also works with Hadoop (and Scalding).



Related Topics



Leave a reply



Submit