Fine-Tuning Glue Export File Size for Athena Queries
Introduction
If you are building datalakes on AWS, most likely you are using AWS Athena to provide your organization a way to analyze vast amounts of data residing in Amazon S3 with lighthing-fast speed and efficiency.
However, the performance of Athena queries is significantly impacted by the size and organization of the underlying data files. Small files can lead to increased latency and higher costs due to the nature of how Athena processes data.
As we already know, Athena is charging us based on the amount of data scanned. Of course, there are additional costs. If we are scanning data which resides in S3, we are charged standard S3 rates for storage, requests, and data transfers.
We can save up to 90% per query and get better performance by compressing, partitioning, and converting the data into columnar formats such as Parquet.
If our files are too small, the execution engine will spend additional time with the overhead of listing directories, opening S3 files, getting object metadata, setting up transfers, reading headers etc. This will significantly impact the time and cost of Athena queries.
Sometimes small files carry more metadata than actual data, resulting in increased total size and overhead of processing.
The benefits of larger files include faster listings, fewer Amazons S3 requests, less metadata to manage, and parallel processing if the file format is splittable. However, too large files are not great as well, optimal size is somewhere between 200 MB and 1 GB.
AWS gives an example to illustrate the impact of small files:
Query | Number of Files | Runtime |
---|---|---|
SELECT count(*) FROM lineitem | 100,000 files | 13 seconds |
SELECT count(*) FROM lineitem | 1 file | 1.3 seconds |
Speedup | ~90% |
For more information, please see Top 10 Performance Tuning Tips for Amazon Athena
If you are using AWS Glue to process data and export it to a Glue table i.e. S3 datalake. You are probably familiar with the issue of large number of small files. In this post, I’ll try to cover ways of controlling the number and size of export files in Glue, but also methods of merging small files in a post-processing stage.
I hope you’ll find this post useful and as always, please feel free to reach out if you have any additional questions and suggestions.
Partitions
As we already know, the main idea of parallel processing is to split the input data into multiple parts distributing them across different nodes and processing them in parallel. This is essential for all distributed systems for large scale data processing. In Apache Spark, partitions play a crucial role in determining how data is distributed and processed across the cluster.
A partition is a basic unit of data distribution and it represents a portion of a larger dataset that is processed by a single task. The partition count is influenced by diverse factors, and selecting the most suitable partition quantity can pose a challenge. Achieving the ideal partitioning for a particular use case typically demands investigation and comprehension of the input data and the resources at hand. Some of the factors to consider when determining the right number of partitions include:
- Input data source - for example HDFS block size
- Size and number of input files
- Resources and cluster configuration - number of executor nodes, CPU cores per node, RAM memory etc.
The idea of increasing the number of partitions is to have better parallelism, enabling efficient utilization of available resources and faster execution. However, having too much partitions can lead to worse performance due to increased task scheduling and communication overhead, as driver needs to manage and coordinate tasks across these partitions and nodes.
Fine-tuning the cluster performance is a huge topic and getting the right configuration depends on a particular use case. In this post, we are only interested in how the configuration parameters influence the number of export files.
As the data gets partitioned and processed on different nodes, each node will output a part of data. These parts end-up as single files and as a result we can have large amount of small files each representing a single part of output data. By default, when writing data to output location, each partition of RDD or DataFrame, is written as a separate file, resulting in as many output files as there are partitions.
If we don’t explicitly specify the number of partitions, Spark will determine
the number of partitions on read using the spark.default.parallelism
configuration
parameter. This parameter is the default parallelism level, representing the number of
partitions that Spark will use for distributed processing. The number of available
CPU cores and worker nodes in the cluster significantly affects the optimal value
for this parameter.
Let’s see an example. Running a Glue job with:
- Requested number of workers = 100 and
- Worker Type = G 1X (4vCPU and 16GB RAM)
gives the following configuration:
{'aws_glue_job_id': '<id>',
'context':
{'configuration':
[
...
['spark.driver.cores', '4'],
['spark.default.parallelism', '396'],
['spark.executor.instances', '99'],
['spark.dynamicAllocation.maxExecutors', '99'],
['spark.sql.shuffle.partitions', '396'],
['spark.glue.GLUE_VERSION', '3.0'],
['spark.executor.cores', '4'],
...
]
},
...
}
To get all parameters use
spark_configurations = spark.sparkContext.getConf().getAll()
If you are wondering why
99
. One node is always reserved for a driver which is not used for processing.
This is the most optimal configuration since each worker has 4 CPUs, and each partition gets one CPU on one worker i.e. can be independently processed in parallel. However, some partition can be larger than others, which affects the efficiency of data processing, and repartitioning them guarantees that we are having the data uniformly distributed.
“Stragglers” - task taking longer than others, for example, due to unbalanced amount of data between tasks
Small files on read
Having hundreds of small input files that are only a few kilobytes can significantly impact the performance of a Glue job. As we’ve seen, the number and size of input files affects the number of partitions on read.
We can improve the data processing speed by defining the groupings of input files to enable tasks to read a group of input files into a single in-memory partition. Groupings can be configured using the following parameters:
groupFiles
- Set this parameter toinPartition
to enable groupings of files. This is automatically set by AWS Glue if there are more than 50,000 input files.groupSize
- The target size of groups in bytes. This parameter is optional, if not provided, the AWS Glue calculates a size to use all the CPU cores in the cluster while still reducing the overall number of ETL tasks and in-memory partitions.
Please see Reading input files in larger groups, for more information
Since this configuration affects partitions on read, it will also influence the number of exported files. As always, experimenting with different configuration parameters will give us a better sense of what works best in our particular use case.
Repartitioning
Repartitioning means changing the data distribution among nodes within a cluster. This procedure can serve as a double-edged sword. When executed effectively, it can increase the performance and efficiency of Spark jobs. Conversely, if not handled appropriately, it introduces extra load on the entire cluster and significantly affects the job’s duration.
Since repartitioning redistributes the data, we usually want to perform it when we have some imbalance in data distribution, for example:
- If our data is skewed, meaning that some keys have significantly more data than the others i.e. imbalanced data partitions
- Before performing joins, aggregations, and groupings. Especially if join keys have imbalanced data
- Merging DataFrames with significantly different sizes and skewness in their data distribution etc.
There are multiple ways to repartition the data:
repartition
coalesce
repartitionByRange
partitionBy
Understanding the differences between them is essential for picking the right partitioning strategy.
repartition
repartition
is very expensive operation since it invokes a full data shuffle
across all nodes in a cluster. It’ll evenly distribute the data and can be used
to increase or decrease the number of partitions.
We can define the target number of partition and/or single or multiple columns to use in repartitioning:
DataFrame.repartition(numPartitions, *cols)
Using different arguments will use different partitioning strategies under the hood. Let’s see some in practice.
Specifying just a number of partitions uses RoundRobinPartitioning
. We can
confirm this by examining the execution of the Physical Plan:
df = spark.createDataFrame(data=data, schema=schema)
df = df.repartition(2)
df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM, [id=#6]
+- Scan ExistingRDD[employee_name#0,department#1,state#2,salary#3L,age#4L,bonus#5L]
where data
is some test employee data. What’s important is
Exchange RoundRobinPartitioning(2), REPARTITION_BY_NUM
.
On the other hand, if we specify a single column. Let’s say we want to
repartition on department
:
df = spark.createDataFrame(data=data, schema=schema)
df = df.repartition("department")
df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(department#1, 200), REPARTITION_BY_COL, [id=#6]
+- Scan ExistingRDD[employee_name#0,department#1,state#2,salary#3L,age#4L,bonus#5L]
It’ll use HashPartitioning
. This method divides the data into partitions
based on the hash values of specific columns or expressions. As we haven’t
specified the number of partitions, Spark will take the default number of
shuffle partitions (spark.sql.shuffle.partitions = 200
) and assign each
record to a partition based on the hash value. Doing so will ensure that the
data is evenly distributed across partitions which allows balanced workloads
during the processing.
The final options is to use HashPartitioning
with specific number of partitions:
df = spark.createDataFrame(data=data, schema=schema)
df = df.repartition(2, "department")
df.explain()
which gives the following plan
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(department#1, 2), REPARTITION_BY_NUM, [id=#6]
+- Scan ExistingRDD[employee_name#0,department#1,state#2,salary#3L,age#4L,bonus#5L]
where we can see that hashpartitioning
now has 2
as an argument.
coalesce
coalesce
is used to reduce the number of partitions, it cannot be used to
increase it, compared to repartition
. However, coalesce
merges existing
partitions into a smaller number of partitions without perform full data
shuffle, which makes it more efficient than repartition
.
In order to use coalesce
we have to specify the target number of partitions:
df.coalesce(numPartitions)
coalesce
uses existing partitions to minimize the amount of data that needs
to be transferred across the nodes. It’ll move data from some of the nodes
and merge it onto others. This process can result in partitions with different
amounts of data. Partition imbalance reduces the efficiency of processing and
can make some tasks run longer than others. Having this in mind, coalesce
should
be used carefully.
repartitionByRange
As we’ve seen, repartition
will use HashPartitioner
to hash column values
and determine the partition. If we have a continuous, not discrete, column values
such as numbers, we can use repartitionByRange
to partition the data based
on a range of the column values.
The process of determining actual ranges is done by sampling the column to estimate the ranges, which makes it inconsistent since sampling can return different values.
The sample size can be controlled by the configuration parameter
spark.sql.execution.rangeExchange.sampleSizePerPartition
.
We can specify target number of partitions and columns
DataFrame.repartitionByRange(numPartitions, *cols)
If number of partitions is not specified, the default number of partitions is
used defined using the configuration parameter spark.sql.shuffle.partitions
.
More information in the documentation pyspark.sql.DataFrame.repartitionByRange
partitionBy
partitionBy
is a method of DataFrameWriter
class which is used to write
the DataFrame
to disk in partitions i.e. partitions the output by the given
columns on the filesystem, one sub-directory for each unique value in partition
columns.
Documentation pyspark.sql.DataFrameWriter.partitionBy
This is quite different than repartition
, which is a DataFrame
method that
is used to increase or reduce the partitions in memory and when written to disk,
it creates all part files in a single directory.
It takes a single or multiple columns:
DataFrameWriter.partitionBy(*cols: Union[str, List[str]]) → pyspark.sql.readwriter.DataFrameWriter
Since the topic of this post is about the number of exported files, partitionBy
is very important, especially in combination with repartition
.
The way it works is maybe unexpected at first. The partitionBy
at write will
be applied on each partition since each of the original partitions is written
independently. There is an awesome explanation and example provided
by conradlee: Difference between df.repartition and DataFrameWriter partitionBy?.
TL;DR of the answer: Let’s say we have 10 partitions that span 7 days and we
want to partitionBy("date")
. How many files are we going to get? Well, it
depends on the actual partitions. If we have 7 days in each partitions, we’ll
get 70 files. If each partition has data for exactly one day, we’ll have 10 files.
Understanding this is essential for handling the partitions in right way, but also for controlling the number of exported files.
Merging files
Sometimes having large amount of small files is inevitable and we have to define a post-processing step in order to merge small files into optimal size and number of files.
Merging small files into larger ones in Amazon S3 is a common need to improve performance, reduce costs, and optimize storage. Luckily, AWS already offers several approaches. We’ll explore some of them.
Glue
We can create a simple Glue ETL job which will read all the small files into one DataFrame, which we repartition and export to a new S3 location as a large file.
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
df = glueContext.create_dynamic_frame.from_options(
's3',
{'paths': ['s3://<input-bucket>/']},
'parquet',
)
partitioned_df=df.toDF().repartition(1)
partitioned_dynamic_df=DynamicFrame.fromDF(partitioned_df, glueContext, "partitioned_df")
glueContext.write_dynamic_frame.from_options(
frame=partitioned_dynamic_df,
connection_type='s3',
connection_options={'path': 's3://<output-bucket>/'},
format='parquet',
)
In this example we are creating only 1 partition. However, if the size of DataFrame is significantly large, this process can fail due to out-of-memory error. One of the ways to handle this is to determine the optimal number of partition based on total size of input files and desired target size of output files.
The following equation can give us a rough estimate:
where is
and ts
are input and target size, respectively.
For example, if we have 1 Gb of input data and we want output files of 10 Mb, target number partitions will be 100.
For more information see AWS Glue FAQ, or How to Get Things Done
Lambda
Depending on the number and size of files, we can use a Lambda function to merge multiple small files into one.
There are multiple ways to achieve this. The one that I find easiest and most efficient is by leveraging AWSWrangler i.e. AWS SDK for Pandas.
Please see AWS SDK for Pandas - Quick Start for more information
AWS already offers an official Lambda layer for AWSWrangler, implementing it in existing Lambda functions is pretty straightforward. The library also has built-in methods for reading multiple files into one dataframe, which allows us to export it in a single file. The following code should give you an idea how to do that:
import awswrangler as wr
def lambda_handler(event, context):
s3_objects = wr.s3.list_objects('s3://<bucket>/*.parquet')
dataframe = wr.s3.read_parquet(s3_objects)
wr.s3.to_parquet(dataframe, 's3://<output_bucket>/merged.parquet')
return {
'statusCode': 200,
'body': "Success"
}
Note that list_objects
supports Unix shell-style wildcards in the path
argument. It gives us a simple way to gather all Glue export parts which
are divided into multiple parquet files, usually named as part-0000X-<hash>.snappy.parquet
.
Typically, Lambda proves a speedy and straightforward fix, yet it remains crucial to confirm that our use case aligns with Lambda’s time and space constraints.
EMR - S3DistCp
Apache DistCp is an open-source tool you can use to copy large amounts of data. S3DistCp is similar to DistCp, but optimized to work with AWS, particularly Amazon S3.
For more information see documentation S3DistCp
We can use S3DictCp to concatenate files that match the expression using the
option --groupBy=PATTERN
. For example, we could combine multiple log files into
a single one per day/hour.
Target size is defined by specifying ‑‑targetSize=SIZE
in mebibytes (MiB).
This sizes defines the size of a whole group based on --groupBy
. If the
concatenated group is larger than target size, it’ll be broken into multiple
files and named sequentially with a numeric value.
One important thing is that S3DistCp doesn’t support Parquet files. This is a huge limitation since Parquet is the preferred file format in datalakes. AWS recommends using PySpark instead.
Leave a comment