Creating Spark Data Structure from Multiline Record

creating spark data structure from multiline record

PySpark since version 1.1 supports Hadoop Input Formats.You can use textinputformat.record.delimiter option to use a custom format delimiter as below

from operator import itemgetter

retrosheet = sc.newAPIHadoopFile(
'/path/to/retrosheet/file',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\nid,'}
)
(retrosheet
.filter(itemgetter(1))
.values()
.filter(lambda x: x)
.map(lambda v: (
v if v.startswith('id') else 'id,{0}'.format(v)).splitlines()))

Since Spark 2.4 you can also read data into DataFrame using text reader

spark.read.option("lineSep", '\nid,').text('/path/to/retrosheet/file')

How to match/extract multi-line pattern from file in pysark

If you can use \n<Q as the delimiter to create RDD elements, then it becomes a pure python task to parse the data blocks. Below I create a function (based on your sample) to parse the block texts using regexes and retrieve cols information into Row object (you might have to adjust the regexes to reflect the actual data patterns, i.e. case sensitivity, extra white spaces etc.):

  • For each RDD element, split by '\n' (line-mode)
  • and then for each line, split by > < into a list y
  • we can find rank, quantityUnit by checking y[1] and y[2], quantityAmount by checking y[1] and Item_id by checking y[0].
  • Create Row object by iterating all required fields, set value to None for missing fields

    from pyspark.sql import Row
    import re

    # skipped the code to initialize SparkSession

    # field names to retrieve
    cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank']

    def parse_rdd_element(x, cols):
    try:
    row = {}
    for e in x.split('\n'):
    y = e.split('> <')
    if len(y) < 2:
    continue
    if y[1] in ['rank', 'quantityUnit']:
    row[y[1]] = y[2].split(">")[0]
    else:
    m = re.match(r'^quantityAmount>\D*(\d+)', y[1])
    if m:
    row['quantityAmount'] = m.group(1)
    continue
    m = re.match('^(?:<Q)?(\d+)', y[0])
    if m:
    row['Item_Id'] = 'Q' + m.group(1)
    # if row is not EMPTY, set None to missing field
    return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None
    except:
    return None

setup RDD using newAPIHadoopFile() with \n<Q as delimiter:

rdd = spark.sparkContext.newAPIHadoopFile(
'/path/to/file',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\n<Q'}
)

Use the map function to parse the RDD element into Row object

rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]

Convert the above RDD to dataframe

df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit| rank|
+-------+--------------+------------+----------+
| Q31| 24954| Meter| BestRank|
| Q25| 582| Kilometer|NormalRank|
+-------+--------------+------------+----------+

Some Notes:

  • For better performance, pre-compile all regex patterns using re.compile() before passing them to the parse_rdd_element() function.

  • In case there could be spaces/tabs between \n and <Q, multiple blocks will be added into the same RDD element, just split the RDD element by \n\s+<Q and replace map() with flatMap().

Reference: creating spark data structure from multiline record

PySpark: read, map and reduce from multiline record textfile with newAPIHadoopFile

Personally I would:

  • extend delimiter with ::

    sheet = sc.newAPIHadoopFile(
    path,
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'Time\tMHist::'}
    )
  • drop keys:

    values = sheet.values()
  • filter out empty entries

    non_empty = values.filter(lambda x:  x)
  • split:

    grouped_lines = non_empty.map(str.splitlines)
  • separate keys and values:

    from operator import itemgetter

    pairs = grouped_lines.map(itemgetter(0, slice(1, None)))
  • and finally split values:

    pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs])

All of that can done with a single function of course:

import dateutil.parser

def process(pair):
_, content = pair
clean = [x.strip() for x in content.strip().splitlines()]
if not clean:
return []
k, vs = clean[0], clean[1:]
for v in vs:
try:
ds, x = v.split("\t")
yield k, (dateutil.parser.parse(ds), float(x)) # or int(x)
except ValueError:
pass

sheet.flatMap(process)

How to enable multiline reading of a csv file in pyspark

In my analysis I came to know that, It cannot be done through sc.textFile(), the reason for this is as soon as we load the s3 file to rdd, then rdd will have list of elements as each record of a s3 file. At this level itself each line with in the multiline is split into different records. So it cannot be achieved through sc.textFile().

Reading in multiline text files

It's fine as reading in from disk preserves order. filter is a narrow transformation. And zip relies on this fact. There are no wide transformations before the zip.

Alternatively you can zipWithIndex and then JOIN if you want on the zipped value in an appropriate manner. This is a narrow transformation, so no issue.

PySpark (Python): loading multiline records via SparkContext.newAPIHadoopFile

Each (key, value) pair returned by o.a.h.mapreduce.lib.input.TextInputFormat is a single local data structure containing an offset (long) and string. There is no mechanism which can split value between multiple records without creating custom Hadoop InputFormat.

"Thousands of lines" is not very precise description but as a rule of thumb:

  • If on-disk size is less than a few megabytes you're most likely good to go.
  • Otherwise you'll have to keep track of memory usage and GC and adjust configuration.

Also keep in mind that large records can result in suboptimal resource utilization. In the worst case scenario you can end up with a single record per task where cost of bookkeeping can be much higher than the actual execution.



Related Topics



Leave a reply



Submit