creating spark data structure from multiline record
PySpark since version 1.1 supports Hadoop Input Formats.You can use textinputformat.record.delimiter
option to use a custom format delimiter as below
from operator import itemgetter
retrosheet = sc.newAPIHadoopFile(
'/path/to/retrosheet/file',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\nid,'}
)
(retrosheet
.filter(itemgetter(1))
.values()
.filter(lambda x: x)
.map(lambda v: (
v if v.startswith('id') else 'id,{0}'.format(v)).splitlines()))
Since Spark 2.4 you can also read data into DataFrame
using text
reader
spark.read.option("lineSep", '\nid,').text('/path/to/retrosheet/file')
How to match/extract multi-line pattern from file in pysark
If you can use \n<Q
as the delimiter to create RDD elements, then it becomes a pure python task to parse the data blocks. Below I create a function (based on your sample) to parse the block texts using regexes and retrieve cols information into Row object (you might have to adjust the regexes to reflect the actual data patterns, i.e. case sensitivity, extra white spaces etc.):
- For each RDD element, split by '\n' (line-mode)
- and then for each line, split by
> <
into a listy
- we can find
rank
,quantityUnit
by checking y[1] and y[2],quantityAmount
by checking y[1] andItem_id
by checking y[0]. Create Row object by iterating all required fields, set value to None for missing fields
from pyspark.sql import Row
import re
# skipped the code to initialize SparkSession
# field names to retrieve
cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank']
def parse_rdd_element(x, cols):
try:
row = {}
for e in x.split('\n'):
y = e.split('> <')
if len(y) < 2:
continue
if y[1] in ['rank', 'quantityUnit']:
row[y[1]] = y[2].split(">")[0]
else:
m = re.match(r'^quantityAmount>\D*(\d+)', y[1])
if m:
row['quantityAmount'] = m.group(1)
continue
m = re.match('^(?:<Q)?(\d+)', y[0])
if m:
row['Item_Id'] = 'Q' + m.group(1)
# if row is not EMPTY, set None to missing field
return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None
except:
return None
setup RDD using newAPIHadoopFile() with \n<Q
as delimiter:
rdd = spark.sparkContext.newAPIHadoopFile(
'/path/to/file',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\n<Q'}
)
Use the map function to parse the RDD element into Row object
rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]
Convert the above RDD to dataframe
df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit| rank|
+-------+--------------+------------+----------+
| Q31| 24954| Meter| BestRank|
| Q25| 582| Kilometer|NormalRank|
+-------+--------------+------------+----------+
Some Notes:
For better performance, pre-compile all regex patterns using
re.compile()
before passing them to the parse_rdd_element() function.In case there could be spaces/tabs between
\n
and<Q
, multiple blocks will be added into the same RDD element, just split the RDD element by\n\s+<Q
and replacemap()
withflatMap()
.
Reference: creating spark data structure from multiline record
PySpark: read, map and reduce from multiline record textfile with newAPIHadoopFile
Personally I would:
extend delimiter with
::
sheet = sc.newAPIHadoopFile(
path,
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': 'Time\tMHist::'}
)drop keys:
values = sheet.values()
filter out empty entries
non_empty = values.filter(lambda x: x)
split:
grouped_lines = non_empty.map(str.splitlines)
separate keys and values:
from operator import itemgetter
pairs = grouped_lines.map(itemgetter(0, slice(1, None)))and finally split values:
pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs])
All of that can done with a single function of course:
import dateutil.parser
def process(pair):
_, content = pair
clean = [x.strip() for x in content.strip().splitlines()]
if not clean:
return []
k, vs = clean[0], clean[1:]
for v in vs:
try:
ds, x = v.split("\t")
yield k, (dateutil.parser.parse(ds), float(x)) # or int(x)
except ValueError:
pass
sheet.flatMap(process)
How to enable multiline reading of a csv file in pyspark
In my analysis I came to know that, It cannot be done through sc.textFile(), the reason for this is as soon as we load the s3 file to rdd, then rdd will have list of elements as each record of a s3 file. At this level itself each line with in the multiline is split into different records. So it cannot be achieved through sc.textFile().
Reading in multiline text files
It's fine as reading in from disk preserves order. filter is a narrow transformation. And zip relies on this fact. There are no wide transformations before the zip.
Alternatively you can zipWithIndex and then JOIN if you want on the zipped value in an appropriate manner. This is a narrow transformation, so no issue.
PySpark (Python): loading multiline records via SparkContext.newAPIHadoopFile
Each (key, value) pair returned by o.a.h.mapreduce.lib.input.TextInputFormat
is a single local data structure containing an offset (long
) and string. There is no mechanism which can split value between multiple records without creating custom Hadoop InputFormat
.
"Thousands of lines" is not very precise description but as a rule of thumb:
- If on-disk size is less than a few megabytes you're most likely good to go.
- Otherwise you'll have to keep track of memory usage and GC and adjust configuration.
Also keep in mind that large records can result in suboptimal resource utilization. In the worst case scenario you can end up with a single record per task where cost of bookkeeping can be much higher than the actual execution.
Related Topics
How to Implement a Binary Tree
Salt and Hash a Password in Python
Python, Https Get with Basic Authentication
How to Make a 4D Plot with Matplotlib Using Arbitrary Data
Why Are Slice and Range Upper-Bound Exclusive
How to Upgrade to Python 3.6 with Conda
How to Filter a Date of a Datetimefield in Django
Check If String Contains Only Whitespace
Adding Meta-Information/Metadata to Pandas Dataframe
Matplotlib Legend Markers Only Once
How to Dynamically Add/Remove Periodic Tasks to Celery (Celerybeat)
Python Extending with - Using Super() Python 3 VS Python 2
Count Consecutive Occurences of Values Varying in Length in a Numpy Array