Spark Read File from S3 Using Sc.Textfile ("S3N://...)

Spark read file from S3 using sc.textFile (s3n://...)

Confirmed that this is related to the Spark build against Hadoop 2.60. Just installed Spark 1.4.0 "Pre built for Hadoop 2.4 and later" (instead of Hadoop 2.6). And the code now works OK.

sc.textFile("s3n://bucketname/Filename") now raises another error:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

The code below uses the S3 URL format to show that Spark can read S3 file. Using dev machine (no Hadoop libs).

scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt")
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

scala> lyrics.count
res1: Long = 9

Even Better: the code above with AWS credentials inline in the S3N URI will break if the AWS Secret Key has a forward "/". Configuring AWS Credentials in SparkContext will fix it. Code works whether the S3 file is public or private.

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
myRDD.count

Difference between sc.textFile and spark.read.text in Spark

To answer (a),

sc.textFile(...) returns a RDD[String]

textFile(String path, int minPartitions)

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

spark.read.text(...) returns a DataSet[Row] or a DataFrame

text(String path)

Loads text files and returns a DataFrame whose schema starts with a string column named "value", and followed by partitioned columns if there are any.

For (b), it really depends on your use case. Since you are trying to create a RDD here, you should go with sc.textFile. You can always convert a dataframe to a rdd and vice-versa.

How to map filenames to RDD using sc.textFile(s3n://bucket/*.csv)?

The only text method that includes the file name is wholeTextFiles.

sc.wholeTextFiles(path).map { case (filename, content) => ... }

Spark cannot read files stored on AWS S3 in Frankfurt region (Ireland region works fine)

Your path is set to s3://, I think it should be s3n://

Try changing that, along with using these authentication parameters:

val hadoopConf=sc.hadoopConfiguration
hadoopConf.set("fs.s3n.awsAccessKeyId","key")
hadoopConf.set("fs.s3n.awsSecretAccessKey","secret")

Alternatively you could try using s3a:// but you'll have to include the hadoop-aws and aws-java-sdk jar files into your CLASSPATH.

How to load RDDs from S3 files from spark-shell?

org.apache.hadoop.fs.StreamCapabilities is in hadoop-common-3.1.jar
You are probably mixing version of Hadoop JARs, which, as coved in the s3a troubleshooting docs is doomed.

Spark shell works fine with the right JARs in. But ASF Spark releases don't work with Hadoop 3.x yet, due to some outstanding issues. Stick to Hadoop 2.8.x and you'll get good S3 performance without so much pain.

Spark Dataframe parallel read

While waiting for the issue to get fixed I've found a workaround that works for now. The .json file contains a dictionary for each row so what I could do is first read it in as an RDD textfile and then cast it into a dataframe by specifying the columns manually:

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
sc = SparkContext(CLUSTER_URL, 'ipython-notebook')
sqlContext = SQLContext(sc)
data = sqlContext.textFile('s3n://bucket/data.json', 30).cache()
df_rdd = data\
.map(lambda x : dict(eval(x)))\
.map(lambda x : Row(x1=x['x1'], x2=x['x2'], x3=x['x3'], x4=x['x4']))
df = sqlContext.inferSchema(df_rdd).cache()

As per the docs. This also means that you could use a .csv file instead of a json file (which usually saves a lot of disk space) as long as you manually specify the column names in spark.



Related Topics



Leave a reply



Submit