How to Read a List of Parquet Files from S3 as a Pandas Dataframe Using Pyarrow

How to read a list of parquet files from S3 as a pandas dataframe using pyarrow?

You should use the s3fs module as proposed by yjk21. However as result of calling ParquetDataset you'll get a pyarrow.parquet.ParquetDataset object. To get the Pandas DataFrame you'll rather want to apply .read_pandas().to_pandas() to it:

import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()

pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()

pyarrow reading parquet from S3 performance confusions

You are correct. Option 2 is just option 1 under the hood.

What is the fastest way for me to read a Parquet file into Pandas?

Both option 1 and option 2 are probably good enough. However, if you are trying to shave off every bit you may need to go one layer deeper, depending on your pyarrow version. It turns out that Option 1 is actually also just a proxy, in this case to the datasets API:

import pyarrow.dataset as ds
dataset = ds.dataset("s3://tpc-h-parquet/lineitem/part0.snappy.parquet")
table = dataset.to_table(use_threads=True)
df = table.to_pandas()

For pyarrow versions >= 4 and < 7 you can usually get slightly better performance on S3 using the asynchronous scanner:

import pyarrow.dataset as ds
dataset = ds.dataset("s3://tpc-h-parquet/lineitem/part0.snappy.parquet")
table = dataset.to_table(use_threads=True, use_async=True)
df = table.to_pandas()

In pyarrow version 7 the asynchronous scanner is the default so you can once again simply use pd.read_parquet("s3://tpc-h-parquet/lineitem/part0.snappy.parquet")

Retrieving data from multiple parquet files into one dataframe (Python)

A variation of @Learning is a mess's answer, but using dd.concat:

from dask.dataframe import read_parquet, concat
dask_df = concat([read_parquet(f) for f in data_files])

How to read partitioned parquet files from S3 using pyarrow in python

I managed to get this working with the latest release of fastparquet & s3fs. Below is the code for the same:

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

credits to martin for pointing me in the right direction via our conversation

NB : This would be slower than using pyarrow, based on the benchmark . I will update my answer once s3fs support is implemented in pyarrow via ARROW-1213

I did quick benchmark on on indivdual iterations with pyarrow & list of files send as a glob to fastparquet. fastparquet is faster with s3fs vs pyarrow + my hackish code. But I reckon pyarrow +s3fs will be faster once implemented.

The code & benchmarks are below :

>>> def test_pq():
... for current_file in list_parquet_files:
... f = fs.open(current_file)
... df = pq.read_table(f).to_pandas()
... # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
... #probably not the best way to split :)
... elements_list=current_file.split('/')
... for item in elements_list:
... if item.find(date_partition) != -1:
... current_date = item.split('=')[1]
... elif item.find(dma_partition) != -1:
... current_dma = item.split('=')[1]
... df['serial_number'] = current_dma
... df['cur_date'] = current_date
... list_.append(df)
... frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
... df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317

Update 2019

After all PRs, Issues such as Arrow-2038 & Fast Parquet - PR#182 have been resolved.

Read parquet files using Pyarrow

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas()

Read parquet files using Fast parquet

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()

Quick benchmarks

This is probably not the best way to benchmark it. please read the blog post for a through benchmark

#pyarrow
>>> import timeit
>>> def test_pq():
... dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
... table = dataset.read()
... df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
... df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028

Further reading regarding Pyarrow's speed

Reference :

  • fastparquet
  • s3fs
  • pyarrow
  • pyarrow arrow code based on discussion & also documentation
  • fastparquet code based on discussions PR-182 , PR-182 & also documentation

Is there any way to capture the input file name of multiple parquet files read in with a wildcard in pandas/awswrangler?

You'll need to add the filename as a new column to each dataframe as you load them. For example, here is how to do this with a set of CSV files since that is easier to run as an example. You'll follow a similar pattern for parquet files.

from pathlib import Path

import pandas as pd

# write a couple fake csv files to our disk as examples
Path("csv1.csv").write_text("a,b\n1,2\n1,2")
Path("csv2.csv").write_text("b,c\n3,4\n3,4")

all_dfs = []

# for every csv file that we want to load
for f in Path(".").glob("csv*.csv"):

# read the csv
df = pd.read_csv(f)

# add a filename column to the dataframe
df["filename"] = f.name

# store the dataframe to concat later
all_dfs.append(df)

# put together the dataframes for each file
pd.concat(all_dfs)
#> a b filename c
#> 0 1.0 2 csv1.csv NaN
#> 1 1.0 2 csv1.csv NaN
#> 0 NaN 3 csv2.csv 4.0
#> 1 NaN 3 csv2.csv 4.0


Related Topics



Leave a reply



Submit