Read whole text files from a compression in Spark
One possible solution is to read data with binaryFiles
and extract content manually.
Scala:
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._
def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
Stream.continually(Option(tar.getNextTarEntry))
// Read until next exntry is null
.takeWhile(_.isDefined)
// flatten
.flatMap(x => x)
// Drop directories
.filter(!_.isDirectory)
.map(e => {
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i = tar.read(buffer, 0, n)
(i, buffer.take(i))}
// Take as long as we've read something
.takeWhile(_._1 > 0)
.map(_._2)
.flatten
.toArray})
.toArray
}
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) =
new String(bytes, StandardCharsets.UTF_8)
sc.binaryFiles("somePath").flatMapValues(x =>
extractFiles(x).toOption).mapValues(_.map(decode()))
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"
Full usage example with Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src
Python:
import tarfile
from io import BytesIO
def extractFiles(bytes):
tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
return [tar.extractfile(x).read() for x in tar if x.isfile()]
(sc.binaryFiles("somePath")
.mapValues(extractFiles)
.mapValues(lambda xs: [x.decode("utf-8") for x in xs]))
Does spark compress on the fly when reading data?
No, it doesn't compress the raw files.
How to read .gz compressed file using spark DF or DS?
Reading a compressed csv is done in the same way as reading an uncompressed csv file. For Spark version 2.0+ it can be done as follows using Scala (note the extra option for the tab delimiter):
val df = spark.read.option("sep", "\t").csv("file.csv.gz")
PySpark:
df = spark.read.csv("file.csv.gz", sep='\t')
The only extra consideration to take into account is that the gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down. After the read is done the data can be shuffled to increase parallelism.
Read a compressed file *with custom extension* with spark
Here there's a workaround to fix this problem http://arjon.es/2015/10/02/reading-compressed-data-with-spark-using-unknown-file-extensions/
The relevant section:
...extend GzipCodec and override the getDefaultExtension method.
package smx.ananke.spark.util.codecs
import org.apache.hadoop.io.compress.GzipCodec
class TmpGzipCodec extends GzipCodec {
override def getDefaultExtension(): String = ".gz.tmp" // You should change it to ".Z"
}
Now we just registered this codec, setting
spark.hadoop.io.compression.codecs on SparkConf:
val conf = new SparkConf()
// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")
val sc = new SparkContext(conf)
val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")
How to read multiple text files into a single RDD?
You can specify whole directories, use wildcards and even CSV of directories and wildcards. E.g.:
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
As Nick Chammas points out this is an exposure of Hadoop's FileInputFormat
and therefore this also works with Hadoop (and Scalding).
Related Topics
Difference in System. Exit(0) , System.Exit(-1), System.Exit(1 ) in Java
Convert from List<Completablefuture> to Completablefuture<List>
How to Extract a Tar File in Java
How to Reference Another Property in Java.Util.Properties
Why Business Logic Should Be Moved Out of Jsp
Run Single Test from a Junit Class Using Command-Line
Get the Index of a Pattern in a String Using Regex
Why Can't I Use a Type Argument in a Type Parameter with Multiple Bounds
Randomly Select an Item from a List
Does Okhttp Support Accepting Self-Signed Ssl Certs
Parsing JSON in Java Without Knowing JSON Format
Synchronizing on an Integer Value
Automatically Resize Canvas to Fill the Enclosing Parent
How Do Java Interfaces Simulate Multiple Inheritance
Performance of Traditional for Loop VS Iterator/Foreach in Java