3 min readSep 23, 2024
ETL Pipeline with AWS Glue and PySpark: A Hands-on PoC
1. Setup AWS Glue Resources
Create S3 Buckets:
- Go to the S3 console and create two S3 buckets:
- Source Bucket (for input data).
- Target Bucket (for the transformed data).
- Upload a CSV Dataset:
- Upload a sample dataset (e.g., CSV file) into your source bucket. You can use a public dataset or generate a simple dataset like employee data (e.g., employee_id, name, age, salary).
****** Refer to this site: https://www.mockaroo.com/ — download the data from the mentioned *****
2. Create AWS Glue Crawler
- Purpose: To scan the source data and create a Glue Data Catalog (schema).
- Steps:
- Go to the AWS Glue console and click on “Crawlers.”
- Click “Add Crawler,” and give it a name (e.g.,
employee_data_crawler
). - Configure the data source as your S3 source bucket containing the CSV file.
- Select an existing or create a new Glue Data Catalog database.
- Run the crawler to detect the schema and add it to the Data Catalog.
3. Create an ETL Job Using Py Spark
- Create a Glue Job:
- Go to the Glue console and select “Jobs.”
- Click “Add Job” and give it a name (e.g.,
transform_employee_data
). - Choose the IAM role created earlier.
- Select “Spark” as the execution type and “Python” as the script type.
- Configure the job to use a Glue Data Catalog table (from the crawler).
- Write the transformation script using Py Spark.
4. Write Py Spark ETL Script
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# Initialize GlueContext
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Load data from Glue Data Catalog
datasource = glueContext.create_dynamic_frame.from_catalog(database = "your_database", table_name = "your_table")
# Transform the data (e.g., filter rows where gender is 'Male')
transformed_data = Filter.apply(frame=datasource, f=lambda x: x["gender"] == "Male")
# Map transformations (e.g., create full_name and email domain columns)
transformed_data = transformed_data.map(lambda row: {
**row,
"full_name": f"{row['first_name']} {row['last_name']}", # Create full_name column
"email_domain": row['email'].split('@')[-1], # Extract email domain
})
# Convert back to a Spark DataFrame
df = transformed_data.toDF()
# Write the transformed data to S3 in Parquet format
df.write.mode("overwrite").parquet("s3://your-target-bucket/employee_data_transformed/")
# Complete the job
job.commit()
- Datasource: Extracts data from the Glue Data Catalog.
- Transformation:
Filter Transformation: Filters rows where gender == 'Male'
. You can change this condition as per your requirements.
Map Transformation:Added a new column full_name
by combining first_name
and last_name
.
Extracted the domain part of the email (after the @
) and stored it in a new column called email_domain
.
Columns Used:
- The code assumes the presence of the columns
id
,first_name
,last_name
,email
,gender
, andip_address
. Adjust the logic inside themap
function as needed for other transformations. - Load: Saves the transformed data to the target S3 bucket in Parquet format.
6. Configure Job Output
- Set up the target bucket path (e.g.,
s3://your-target-bucket/employee_data_transformed/
) to store the transformed data. - Specify file format (e.g., Parquet or CSV) for the output.
7. Run the AWS Glue Job
- Go to the Glue console and start the job.
- Monitor the job execution from the AWS Glue Job Run History.
- Once the job completes, go to your target S3 bucket and verify the transformed data.
8. Verify the Transformed Data
- Download the files from your target S3 bucket and inspect them using any tool (e.g., Amazon Athena, Excel, or Pandas).
- Optionally, run queries using Amazon Athena on the output data in S3.
9. (Optional) Add Logging and Error Handling
- Add logging to monitor and debug the Glue job, using AWS CloudWatch for detailed logs.
- Implement error handling mechanisms in the PySpark script for better control of failures.
Steps to Query Parquet Data in S3 using Athena:
1. Open Amazon Athena Console
- Navigate to the Amazon Athena console from your AWS Management Console.
- Make sure you are in the same region where your data is stored in S3.
2. Set Up Athena Query Result Location
- Before running any queries, you must set the output location for Athena query results.
- Click on the “Settings” (gear icon in the top-right corner).
- In the Query result location field, choose or specify an S3 bucket where the Athena query results will be stored (e.g.,
s3://your-query-results-bucket/
). - Save the settings.
SELECT *
FROM your_database.your_table
LIMIT 10;
10. Clean Up Resources
- Delete the Glue job, crawler, and S3 buckets if not needed further.