Jeeva-AWSLabsJourney
3 min readSep 23, 2024

ETL Pipeline with AWS Glue and PySpark: A Hands-on PoC

1. Setup AWS Glue Resources

Create S3 Buckets:

  • Go to the S3 console and create two S3 buckets:
  1. Source Bucket (for input data).
  2. Target Bucket (for the transformed data).
  • Upload a CSV Dataset:
  • Upload a sample dataset (e.g., CSV file) into your source bucket. You can use a public dataset or generate a simple dataset like employee data (e.g., employee_id, name, age, salary).

****** Refer to this site: https://www.mockaroo.com/ — download the data from the mentioned *****

2. Create AWS Glue Crawler

  • Purpose: To scan the source data and create a Glue Data Catalog (schema).
  • Steps:
  1. Go to the AWS Glue console and click on “Crawlers.”
  2. Click “Add Crawler,” and give it a name (e.g., employee_data_crawler).
  3. Configure the data source as your S3 source bucket containing the CSV file.
  4. Select an existing or create a new Glue Data Catalog database.
  5. Run the crawler to detect the schema and add it to the Data Catalog.

3. Create an ETL Job Using Py Spark

  • Create a Glue Job:
  1. Go to the Glue console and select “Jobs.”
  2. Click “Add Job” and give it a name (e.g., transform_employee_data).
  3. Choose the IAM role created earlier.
  4. Select “Spark” as the execution type and “Python” as the script type.
  5. Configure the job to use a Glue Data Catalog table (from the crawler).
  6. Write the transformation script using Py Spark.

4. Write Py Spark ETL Script

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Initialize GlueContext
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Load data from Glue Data Catalog
datasource = glueContext.create_dynamic_frame.from_catalog(database = "your_database", table_name = "your_table")

# Transform the data (e.g., filter rows where gender is 'Male')
transformed_data = Filter.apply(frame=datasource, f=lambda x: x["gender"] == "Male")

# Map transformations (e.g., create full_name and email domain columns)
transformed_data = transformed_data.map(lambda row: {
**row,
"full_name": f"{row['first_name']} {row['last_name']}", # Create full_name column
"email_domain": row['email'].split('@')[-1], # Extract email domain
})

# Convert back to a Spark DataFrame
df = transformed_data.toDF()

# Write the transformed data to S3 in Parquet format
df.write.mode("overwrite").parquet("s3://your-target-bucket/employee_data_transformed/")

# Complete the job
job.commit()
  • Datasource: Extracts data from the Glue Data Catalog.
  • Transformation:

Filter Transformation: Filters rows where gender == 'Male'. You can change this condition as per your requirements.

Map Transformation:Added a new column full_name by combining first_name and last_name.

Extracted the domain part of the email (after the @) and stored it in a new column called email_domain.

Columns Used:

  • The code assumes the presence of the columns id, first_name, last_name, email, gender, and ip_address. Adjust the logic inside the map function as needed for other transformations.
  • Load: Saves the transformed data to the target S3 bucket in Parquet format.

6. Configure Job Output

  • Set up the target bucket path (e.g., s3://your-target-bucket/employee_data_transformed/) to store the transformed data.
  • Specify file format (e.g., Parquet or CSV) for the output.

7. Run the AWS Glue Job

  • Go to the Glue console and start the job.
  • Monitor the job execution from the AWS Glue Job Run History.
  • Once the job completes, go to your target S3 bucket and verify the transformed data.

8. Verify the Transformed Data

  • Download the files from your target S3 bucket and inspect them using any tool (e.g., Amazon Athena, Excel, or Pandas).
  • Optionally, run queries using Amazon Athena on the output data in S3.

9. (Optional) Add Logging and Error Handling

  • Add logging to monitor and debug the Glue job, using AWS CloudWatch for detailed logs.
  • Implement error handling mechanisms in the PySpark script for better control of failures.

Steps to Query Parquet Data in S3 using Athena:

1. Open Amazon Athena Console

  • Navigate to the Amazon Athena console from your AWS Management Console.
  • Make sure you are in the same region where your data is stored in S3.

2. Set Up Athena Query Result Location

  • Before running any queries, you must set the output location for Athena query results.
  1. Click on the “Settings” (gear icon in the top-right corner).
  2. In the Query result location field, choose or specify an S3 bucket where the Athena query results will be stored (e.g., s3://your-query-results-bucket/).
  3. Save the settings.
SELECT *
FROM your_database.your_table
LIMIT 10;

10. Clean Up Resources

  • Delete the Glue job, crawler, and S3 buckets if not needed further.
Jeeva-AWSLabsJourney
Jeeva-AWSLabsJourney

Written by Jeeva-AWSLabsJourney

Exploring AWS, cloud, Linux & DevOps. Your guide to navigating the digital realm. Join me on the journey of discovery

No responses yet