Python read file as stream from HDFS
You want xreadlines, it reads lines from a file without loading the whole file into memory.
Edit:
Now I see your question, you just need to get the stdout pipe from your Popen
object:
cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in cat.stdout:
print line
Read HDFS file as stream
The problem comes from that fact that I am watching a specific file. I should be watching a folder to catch the new files. Apparently, there is no native mean to watch a file while data is being append.
How to read the file from hdfs
Get PySpark installed.
text = sc.textFile('hdfs:///project1/mr.txt')
first_line = text.first()
PySpark HDFS data streams reading/writing
After some investigation, I found a solution for my problem. The solution involves, creating some JVM objects via spark context and use them for buffered i/o operations:
...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
raw_out = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet')) # FSDataOutputStream
out_stream = sc._jvm.java.io.BufferedOutputStream(raw_out)
for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):
raw_in = hdfs.open(f.getPath()) # FSDataInputStream
in_stream = sc._jvm.java.io.BufferedInputStream(raw_in)
while in_stream.available() > 0:
out_stream.write(in_stream.read())
out_stream.flush()
in_stream.close()
out_stream.close()
How to use a file in a hadoop streaming job using python?
hadoop jar contrib/streaming/hadoop-streaming-1.1.1.jar -file ./mapper.py \
-mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py \
-input test/input.txt -output test/output -file '../user_ids'
Does ../user_ids exist on your local file path when you execute the job? If it does then you need to amend your mapper code to account for the fact that this file will be available in the local working directory of the mapper at runtime:
f = open('user_ids','r')
Streaming files from a tar file in hdfs
Try passing the HDFS file handle to the fileobj
argument of tarfile.open
tf = tarfile.open(fileobj=f)
Using files in Hadoop Streaming with Python
I was able to resolve the issue after searching a solution for like 3 days.
The problem is with the newer version of Hadoop (2.2.0 in my case). The mapper code, when reading values from files was giving an exit code of non-zero at some point (maybe because it was reading a huge list of values(784) at a time). There is a setting in the Hadoop 2.2.0, which tells the Hadoop System to give a general error (subprocess failed with code 1). This setting is set to True by default. I just had to set the value of this property to False, and it made my code run without any errors.
Setting is: stream.non.zero.exit.is.failure. Just set it to false when streaming. So the streaming command would be somewhat like:
**hadoop jar ... -D stream.non.zero.exit.is.failure=false ...**
Hope it helps someone, and saves 3 days... ;)
Problem in creating a separate function to read file in Hadoop Streaming
Try this simplified script:
#!/usr/bin/env python
import sys
def main():
filename = "my_dict.txt"
listfile = open(filename)
# doesn't create an itermediate list
listDM = set(line.strip() for line in listfile)
# less Pythonic but significantly faster
# still doesn't create an intermediate list
# listDM = set(imap(str.strip, listfile))
listfile.close()
for line in sys.stdin:
line = line.strip()
if line in listDM:
print '%s\t%d' % (line, 1)
if __name__ == '__main__':
main()
If you use the faster commented out alternative, you need to from itertools import imap
.
Related Topics
How to Use a String as a Keyword Argument
Python 2.X Gotchas and Landmines
Call Class Method from Another Class
Browse Files and Subfolders in Python
Importerror: No Module Named 'Django.Core.Urlresolvers'
Adding Headers to Requests Module
Pivot String Column on Pyspark Dataframe
Can You List the Keyword Arguments a Function Receives
Pythonic Way to Combine For-Loop and If-Statement
How to Create an Object and Add Attributes to It
How to Display a 3D Plot of a 3D Array Isosurface in Matplotlib Mplot3D or Similar
Is There a Difference Between Using a Dict Literal and a Dict Constructor
Unicodeencodeerror: 'Ascii' Codec Can't Encode Character at Special Name
Using Monotonically_Increasing_Id() for Assigning Row Number to Pyspark Dataframe