Skip to content

aviyehuda.com

Menu
  • Open Source
  • Android
  • Java
  • Others
  • Contact Me
  • About Me
Menu

Parquet data filtering with Pandas

Posted on 13/10/2023

When it comes to filtering data from Parquet files using pandas, several strategies can be employed. While it’s widely recognized that partitioning data can significantly enhance the efficiency of filtering operations, there are additional methods to optimize the performance of querying data stored in Parquet files. Partitioning is just one of the options.

Filtering by partitioned fields

As previously mentioned, this approach is not only the most familiar but also typically the most impactful in terms of performance optimization. The rationale behind this is straightforward. When partitions are employed, it becomes possible to selectively exclude the need to read entire files or even entire directories of files (aka, predicate pushdown), resulting in a substantial and dramatic improvement in performance.

import pandas as pd
import time
from faker import Faker

fake = Faker()

MIL=1000000
NUM_OF_RECORDS=10*MIL
FOLDER="/tmp/out/"
PARTITIONED_PATH=f"{FOLDER}partitioned_{NUM_OF_RECORDS}/"
NON_PARTITIONED_PATH_PREFIX=f"{FOLDER}non_partitioned_{NUM_OF_RECORDS}.parquet"

print(f"Creating fake data")
data = {
    'id': range(NUM_OF_RECORDS),  # Generate IDs from 1 to 100
    'name': [fake.name() for _ in range(NUM_OF_RECORDS)],
    'age': [fake.random_int(min=18, max=99) for _ in range(NUM_OF_RECORDS)],
    'state': [fake.state() for _ in range(NUM_OF_RECORDS)],
    'city': [fake.city() for _ in range(NUM_OF_RECORDS)],
    'street': [fake.street_address() for _ in range(NUM_OF_RECORDS)]
}

df = pd.DataFrame(data)

# writing without partitions
df.to_parquet(path=NON_PARTITIONED_PATH)

# writing partitioned data
df.to_parquet(path=PARTITIONED_PATH, partition_cols=['state'])

# reading non partitioned
start_time = time.time()
df1 = pd.read_parquet(path=NON_PARTITIONED_PATH)
df1 = df1[df1['state']=='California']
runtime1 = (time.time()) - start_time  # 37 sec

# reading partitioned data
start_time = time.time()
df2 = pd.read_parquet(path=PARTITIONED_PATH, filters=[('state','==','California')])
runtime2 = (time.time()) - start_time # 0.20 sec

The time improvement (along with reduced memory and CPU usage) is substantial, decreasing from 37 seconds to just 0.20 seconds.

Filtering by non partitioned fields

In the example above, we observed how filtering based on a partitioned field can enhance data retrieval. However, there are scenarios where data can’t be effectively partitioned by the specific field we wish to filter. Moreover, in some cases, filtering is required based on multiple fields. This means all input files will be opened, which can be harmful to performance.

Thankfully, Parquet offers a clever solution to mitigate this issue. Parquet files are split to row groups , within each row group Parquet stores metadata. This metadata includes the minimum and maximum values for each field.

When writing Parquet files with Pandas you can select what will be the number of records in each control group.

When using Pandas to read Parquet files with filters, the Pandas library leverages this Parquet metadata to efficiently filter data loaded into memory. If the desired field falls outside the min/max range of a row group, that entire row group is gracefully skipped.

df = pd.DataFrame(data)

# writing non partitioned data, specifying the size of the row group
df.to_parquet(path=PATH_TO_PARQUET_FILE, row_group_size=1000000)

# reading non partitioned data and filtering by row groups only
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])

Viewing the metadata inside Parquet files can be done using PyArrow.

>>> import pyarrow.parquet as pq

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x125b21220>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 6
  num_rows: 1000000
  num_row_groups: 10
  format_version: 2.6
  serialized_size: 9325

>>> parquet_file.metadata.row_group(0).column(3)
<pyarrow._parquet.ColumnChunkMetaData object at 0x125b5b180>
  file_offset: 1675616
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 100000
  path_in_schema: state
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x115283590>
      has_min_max: True
      min: Alabama
      max: Wyoming
      null_count: 0
      distinct_count: 0
      num_values: 100000
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 1599792
  data_page_offset: 1600354
  total_compressed_size: 75824
  total_uncompressed_size: 75891

Notice that the number of row groups in mentioned in the metadata of the entire file and the minimum and maximum values mentioned inside the statistics section of each column for each row group.

However, there is a method to further harness this Parquet feature for even more optimized results: sorting.

Filtering by sorted fields

As mentioned in the previous section, part of the metadata stored by Parquet includes the minimum and maximum values for each field within every row group. When the data is sorted based on the field we intend to filter by, Pandas has a greater likelihood of skipping more row groups.

For example, let’s consider a dataset that includes a list of records, with one of the fields representing ‘state.’ If the records are unsorted, there’s a good chance that each state appears in most of the row groups. For example, look at the metadata in the previous section, you can see that the 1st row group alone holds all the states from ‘Alabama’ to ‘Wyoming’.

However, if we sort the data based on the ‘state’ field, there’s a significant probability of skipping many row groups.

df = pd.DataFrame(data)

# sorting the data based on 'state'
df.sort_values("state").to_parquet(path=NON_PARTITIONED_SORTED_PATH)

Now let’s look again at the metadata and see how it changed

>>> parquet_file = pq.ParquetFile(PATH_TO_PARQUET_FILE)

>>> parquet_file.metadata.row_group(0).column(3).statistics.min
'Alabama'
>>> parquet_file.metadata.row_group(0).column(3).statistics.max
'Kentucky'


>>> parquet_file.metadata.row_group(1).column(3).statistics.min
'Kentucky'
>>> parquet_file.metadata.row_group(1).column(3).statistics.max
'North Dakota'


>>> parquet_file.metadata.row_group(2).column(3).statistics.min
'North Dakota'
>>> parquet_file.metadata.row_group(2).column(3).statistics.max
'Wyoming'

As you can see, after sorting by state the min max values are effected accordingly, each row groups hold part of the states instead of all of the states. This means reading with filters should be a lot quicker now.

Now let’s see how it affects the performance of reading the data. The code for reading the data hasn’t change.

# reading non partitioned data and filtering by row groups, the input is sorted by state
start_time = time.time()
df = pd.read_parquet(path=DATASET_PATH, filters=[('state','==','California')])
runtime = (time.time()) - start_time # 0.24 seconds

Astonishingly the performance here is almost as good as using partitions.

This principle applies to both partitioned and non-partitioned data, we can use both methods at the same time. If we sometimes want to filter the data based on field A and other times base on field B, then partitioning by field A and sorting by field B could be a good option.
In other cases, for instance, where the field we want to filter by is a field with a high cardinality, we could partition by some hash of the value (bucketing) and sort the data inside it by the actual value of the field, in this way we will enjoy the advantages of both methods – partitioning and row groups.

Reading a subset of the columns

Although less commonly used, another method for achieving better results during data retrieval involves selecting only the specific fields that are essential for your task. This strategy can occasionally yield improvements in performance. This is due to the nature of Parquet format. Parquet is implemented in a columnar format, which means it stores the data column by column inside each row group. Reading only some of the columns means the other columns will be skipped.

start_time = time.time()
df = pd.read_parquet(path=NON_PARTITIONED_SORTED_PATH, columns=["name", "state"])
runtime = (time.time()) - start_time # 0.08 seconds

Unsurprisingly, the improvement in performance is great.

Conclusion

While partitioning data is typically the optimal approach, it is not always a possibility. Sorting the data can lead to significant improvements, we may skip more row groups this. Additionally, if feasible, selecting only the necessary columns is always a good choice.

I this post helped you understanding how to harness the power of parquet and pandas for better performance.
Here is a script containing all the previously mentioned examples, complete with time comparisons.

1 thought on “Parquet data filtering with Pandas”

  1. Pingback: Data Engineering: Strategies for data retrieval on multi-dimensional data – aviyehuda.com

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *


About Me

REFCARD – Code Gems for Android Developers

Categories

  • Android
  • AWS
  • AWS EMR
  • bluetooth
  • Chrome extension
  • ClientSide
  • Clover
  • Coding Coventions
  • Data Lake
  • General
  • GreaseMonkey
  • Hacks
  • hibernate
  • hibernate validator
  • HTML5
  • HtmlUnit
  • Image Manipulation
  • Java
  • Java Technologies
  • JavaScript
  • Java_Mail
  • JEE/Network
  • Job searching
  • Open Source
  • Pivot
  • projects
  • Pure Java
  • Python
  • software
  • Spark
  • Trivia
  • Web development

Archives

  • November 2023 (1)
  • October 2023 (1)
  • March 2022 (1)
  • January 2022 (1)
  • January 2021 (1)
  • December 2018 (1)
  • August 2018 (1)
  • October 2013 (1)
  • March 2013 (1)
  • January 2013 (2)
  • July 2012 (1)
  • April 2012 (1)
  • March 2012 (1)
  • December 2011 (1)
  • July 2011 (1)
  • June 2011 (1)
  • May 2011 (2)
  • January 2011 (1)
  • December 2010 (1)
  • November 2010 (3)
  • October 2010 (4)
  • July 2010 (1)
  • April 2010 (2)
  • March 2010 (1)
  • February 2010 (2)
  • January 2010 (5)
  • December 2009 (10)
  • September 2009 (1)
 RSS Feed
074638315b5fcb316afdf0dec4f3a8d7-332
©2023 aviyehuda.com | Design: Newspaperly WordPress Theme