Comparison of Python Parquet Engine in Parallelisation and Cloud Object Storage

AgriEnvCoder
6 min readDec 1, 2023

--

Parquet is an open-source columnar storage format that is designed for efficient data storage and processing. It is particularly well-suited for analytics and data processing workloads in the Big Data ecosystem.

Furthermore, to realize the huge data processing and hosting, Parallisation and Cloud Object Storage are prominently important. This article is then a diving into the comparison of different Python Parquet Engines (PyArrow, fastparquet, PySpark, and awswrangler) in the context of parallisation and cloud object storage S3 integration. I am going to analyze based on the requirement of parallel writing on S3 through HTTP protocol.

(The code and instruction are in Python and I cannot guarantee it works the same across languages)

First I asked ChatGPT to generate a mocked Pandas dataframe allows me to write into partition parquet and use as an example:

import pandas as pd
from datetime import datetime, timedelta
import random
import os

# Function to generate random years
def random_years(start_year, end_year, n=10):
year_list = [random.randint(start_year, end_year) for _ in range(n)]
return year_list

# Generating random data
start_year = 2020
end_year = 2022
countries = ["Country{}".format(i) for i in range(1, 11)]
population = [random.randint(1000000, 10000000) for _ in range(30)]

# Creating DataFrame
data = {
'Year': random_years(start_year, end_year, 30),
'Country': random.choices(countries, k=30),
'Population': population
}

df = pd.DataFrame(data)

# Sorting the DataFrame by year
df = df.sort_values(by='Year').reset_index(drop=True)

# Displaying the DataFrame
print(df)

Before diving into, if you do not have a http server, you could follow the instruction here to deploy one.

Pyarrow

PyArrow is a cross-language development platform for in-memory analytics. It provides a standardized language-independent columnar memory format for analytics.

import pyarrow as pa
import pyarrow.parquet as pq

# to avoid memory leak. See this (https://github.com/apache/arrow/issues/18431)
pa.jemalloc_set_decay_ms(0)
# Specify your AWS credentials and S3 details
aws_access_key_id = 'YOUR_ACCESS_KEY_ID'
aws_secret_access_key = 'YOUR_SECRET_ACCESS_KEY'
scheme = 'http'
endpoint = 'your-s3-endpoint'
s3_bucket = 'your-s3-bucket'
s3_prefix = 'path/to/your/folder/in/s3'

# Set up S3 filesystem
s3 = pa.fs.S3FileSystem(scheme=scheme,
access_key = aws_access_key_id,
secret_key = aws_secret_access_key,
endpoint_override=endpoint)

# Convert DataFrame to PyArrow Table
table = pa.Table.from_pandas(df)

# Specify the full S3 path (including the filename)
#s3_path = f's3://{s3_bucket}/{s3_prefix}/dataset'
s3_path = f'{s3_bucket}/{s3_prefix}/dataset'

# Write to the dataset in S3
pq.write_to_dataset(table, root_path=s3_path, filesystem=s3, use_legacy_dataset=False,
partition_cols=['Year','Country'])

print(f"Parquet dataset saved to: {s3_path}")


# Read parquet dataset from S3
df1 = pq.ParquetDataset(s3_path,filesystem=s3,filters=[['Year', '=', '2022']]).read().to_pandas()
df1

PyArrow is suitable to stream data from tables to files. write_to_dataset creates a structure that digests files and order it base on the same argument given (partition_cols, root_path, etc...). See the function doc here

Unfortunately, write_to_dataset using S3 protocol has a difficulty to read. See this issue. However, if you are working locally, this would not be a problem at all.

Last but not least, PyArrow has memory leak problem. If writing a huge amount of data, it’s necessary to define pa.jemalloc_set_decay_ms(0) to avoid crashing work station.

Fastparquet

Fastparquet is a Python library for reading and writing Parquet files quickly. It is designed to be efficient and lightweight.

import fastparquet
import s3fs
# Specify your AWS credentials and S3 details
aws_access_key_id = 'YOUR_ACCESS_KEY_ID'
aws_secret_access_key = 'YOUR_SECRET_ACCESS_KEY'
s3_bucket = 'your-s3-bucket'
s3_prefix = 'path/to/your/folder/in/s3'
endpoint = 'your-s3-endpoint'

s3_options = {
"key": aws_access_key_id,
"secret": aws_secret_access_key,
"client_kwargs": {"endpoint_url": 'http://' + endpoint}
}

s3=s3fs.S3FileSystem(**s3_options)


# Specify the full S3 path (including the filename)
s3_path = f'{s3_bucket}/{s3_prefix}/file.parquet'

# Write DataFrame to Parquet file using fastparquet
fastparquet.write(s3_path, df, partition_cols = ['Year','Country'], open_with=s3.open)
#fastparquet.write(s3_path, df, partition_cols = ['Year','Country'], append=True, open_with=s3.open)

print(f"Parquet file saved to: {s3_path}")

# Read Parquet file from S3 using fastparquet
df1 = fastparquet.read(s3_path, open_with=s3.open)

# Display the DataFrame
print(df1)

It is seamlessly about reading from S3 compared with PyArrow. However, Fastparquet supports writing “file” instead of dataset. The only option to stream table is by append. While append=True, the parquet file would be updated and also metadata.

The problem now turns into the risk of concurrent reading if the streaming is parallel. See the discussion here. It is surely not ideal to take fastparquet for parallel computation even if it is more robust than PyArrow in terms of writing directly to S3.

PySpark

Apache Spark is a distributed computing system that provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. PySpark is the Python API for Apache Spark, allowing users to write Spark applications using Python.

from pyspark.sql import SparkSession

JAVA_HOME = "/Library/Java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home"
os.environ["PATH"] = f"{JAVA_HOME}/bin:{os.environ['PATH']}" # Note /bin is added to JAVA_HOME path.

# Set up PySpark session and Set AWS credentials for PySpark
spark = SparkSession.builder \
.appName("parquet_writer") \
.config("spark.executor.extraJavaOptions", "-Duser.timezone=UTC") \
.config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC") \
.getOrCreate()

# Specify your AWS credentials and S3 details
aws_access_key_id = 'YOUR_ACCESS_KEY_ID'
aws_secret_access_key = 'YOUR_SECRET_ACCESS_KEY'
s3_bucket = 'your-s3-bucket'
s3_key = 'path/to/your/file/in/s3/dataset.parquet'
endpoint = 'your-s3-endpoint'

# Specify the full S3 path (including the filename)
s3_path = f'http://{endpoint}/{s3_bucket}/{s3_key}'

df = spark.createDataFrame(pd.DataFrame(data))

# Write DataFrame to Parquet file in S3
df.write.partitionBy('Year','Country').parquet(s3_path, mode='append')

# Read Parquet file from S3 into DataFrame
df_read = spark.read.parquet(s3_path).filter(f"Year = '2022'")

# Display the DataFrame
df_read.show()

# Stop the Spark session
spark.stop()

Although PySpark enphasizes on distributed computing, I personally do not recommend to use pyspark since it requires to set up a JAVA environment and configure. See the most common problem using pyspark in here. If you do not have JAVA then you first need to install it and point the programme to it.

Moreover, PySpark is relatively slow compared with PyArrow and fastparquet in a single core.

awswrangler

AWS Data Wrangler is a Python library developed by AWS that simplifies data engineering tasks on Amazon Web Services (AWS). It provides abstractions and utilities to interact with AWS services, making it easier to work with data in the cloud.

import awswrangler as wr
import boto3

# Specify your AWS credentials and S3 details
aws_access_key_id = 'YOUR_ACCESS_KEY_ID'
aws_secret_access_key = 'YOUR_SECRET_ACCESS_KEY'
s3_bucket = 'your-s3-bucket'
s3_key = 'path/to/your/file/in/s3/dataset.parquet'

# Set AWS credentials for awswrangler
sess = boto3.Session(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)

# Specify the full S3 path (including the filename)
s3_path = f'{s3_bucket}/{s3_key}'



# Write DataFrame to Parquet file in S3 using awswrangler
wr.s3.to_parquet(
df=df,
path=f's3://{s3_path}', # it is required to use s3 even in http protocol
dataset=True,
concurrent_partitioning=True,
partition_cols=['Year','Country'],
boto3_session=sess)


my_filter = lambda x: True if x["Year"]==2022 else False
# Read Parquet file from S3 into DataFrame using awswrangler
df1 = wr.s3.read_parquet(f's3://{s3_path}', partition_filter=my_filter,dataset=True ,boto3_session=sess)

# Display the DataFrame
print(df1)

awswrangler is designed for Amazon S3. It supports dataset writing and work seamlessly with S3 compared with PyArrow. The only thing is worthy to noted, awsrangler only accepts s3 prefix even the system is set up by http protocol or other protocol. See the support and the discussion here. However, boto3 can help to manage s3 connection in python. Setting up a session allows awsrangler to recognize s3 object even if it’s in http protocol.

However, awswrangler does not allow to pass the partition path to retrieve the object. If the partition overhead is huge, awswranglerread_parquet will take time to filter and retrieve the data. PyArrow nonetheless supports a better solution. It allows writing partition path and then can skip the time of search, which creates more flexibility when if the partition path are known. For example:

# skip searching partition for Year by giving in the path
df1 = pq.ParquetDataset(f'{s3_path}/Year=2022',filesystem=s3).read().to_pandas()
df1

Conclusion

Dataset writing is supported by pyarrow and awswrangler and makes it a better solution for parallel file writing. Diving into the comparison of these 2 formats, awswrangler works seamlessly with S3 connection. On the other hand, pyarrow empowers defined partition path for reading. It creates more flexibility and efficiency to read parquet partition dataset.

Table: a matrix of Parquet Engines towards S3 intregration(support of cloud object storage) and file parallelisation(support of dataset writing).

Personally, I recommend using awsrangler engine to create dataset, and switch to PyArrow for partition data retrieval.

import awswrangler as wr
import boto3
import pyarrow.parquet as pq

# Specify your AWS credentials and S3 details
aws_access_key_id = 'YOUR_ACCESS_KEY_ID'
aws_secret_access_key = 'YOUR_SECRET_ACCESS_KEY'
s3_bucket = 'your-s3-bucket'
s3_key = 'path/to/your/file/in/s3/dataset.parquet'

# Set AWS credentials for awswrangler
sess = boto3.Session(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)

# Specify the full S3 path (including the filename)
s3_path = f'{s3_bucket}/{s3_key}'



# Write DataFrame to Parquet file in S3 using awswrangler
wr.s3.to_parquet(
df=df,
path=f's3://{s3_path}', # it is required to use s3 even in http protocol
dataset=True,
concurrent_partitioning=True,
partition_cols=['Year','Country'],
boto3_session=sess)


df1 = pq.ParquetDataset(f'{s3_path}/Year=2022',filesystem=s3).read().to_pandas()
df1

# Display the DataFrame
print(df1)

--

--