Complete Guide: Data Migration from GCP to AWS with Snowflake, Airflow, CI/CD, ETL, and Python

Jeeva-AWSLabsJourney
5 min readFeb 1, 2025

--

Table of Contents

1. Setting up the Environment

2. Data Migration from GCP to AWS

3. Processing Data with Snowflake

4. Automating ETL with Apache Airflow

5. Real-Time Data Processing

6. Setting up CI/CD Pipelines

7. Monitoring and Alerts for Airflow and Snowflake

8. Advanced Snowflake Features

9. Testing and Validation

10. Conclusion

Step 1: Setting Up the Environment

As before, you’ll need GCP and AWS accounts, along with essential tools like AWS CLI, GCP SDK, Terraform, and Snowflake.

  • Install and configure AWS CLI (aws configure).
  • Install and configure GCP SDK (gcloud init).
  • Set up Snowflake for data processing.

Step 2: Data Migration from GCP to AWS

In this step, the process of extracting data from Google Cloud Storage (GCS) and uploading it to AWS S3 remains the same. However, let’s enhance the solution to support real-time data migration in the next section.

In this step, the process of extracting data from Google Cloud Storage (GCS) and uploading it to AWS S3 remains the same. However, let’s enhance the solution to support real-time data migration in the next section.

Step 3: Processing Data with Snowflake

Create your Snowflake stage and file format as outlined before. But now, we will add advanced functionality like data sharing and clustering (explained in Step 8).

3.1 Setting Up Snowflake

Create your Snowflake stage and file format as outlined before. But now, we will add advanced functionality like data sharing and clustering (explained in Step 8).

3.2 Real-Time Data Loading

For real-time data processing, instead of the standard batch processing, we will set up a continuous stream of data into Snowflake from AWS S3 using Snowpipe.

  1. Enable Snowpipe:
    Snow pipe is Snowflake’s real-time data ingestion service. First, create a Snowpipe that watches your S3 bucket for new files:
CREATE OR REPLACE PIPE my_snowpipe
AUTO_INGEST = TRUE
AS
COPY INTO my_table
FROM @my_stage
FILE_FORMAT = (TYPE = CSV)
ON_ERROR = 'skip_file';

Configure AWS S3 Notifications:
Configure your S3 bucket to send notifications to Snowflake when a new file is uploaded. This triggers Snowpipe to automatically load new data into Snowflake.

aws s3api put-bucket-notification-configuration — bucket my-s3-bucket — notification-configuration file://notification.json

  1. Monitoring Real-Time Data:
    Snowflake’s Task feature can be used to run scheduled SQL queries that monitor data loading, ensuring no data is missed.
sql
CREATE OR REPLACE TASK my_task
WAREHOUSE = my_warehouse
SCHEDULE = 'USING CRON 0 * * * * UTC'
AS
SELECT COUNT(*) FROM my_table;

Step 4: Automating ETL with Apache Airflow

Airflow can be used to trigger real-time ETL pipelines that listen for file events from your S3 bucket and process them through Snowflake in real-time.

Airflow can be used to trigger real-time ETL pipelines in the following way:

  1. Install Airflow with Streaming Support: To support real-time ETL, we’ll use Airflow’s streaming capabilities and trigger tasks based on events.

pip install apache-airflow[streaming]

4.1 Real-Time ETL with Apache Airflow

Airflow can be used to trigger real-time ETL pipelines in the following way:

  1. Install Airflow with Streaming Support: To support real-time ETL, we’ll use Airflow’s streaming capabilities and trigger tasks based on events.
pip install apache-airflow[streaming]
  1. Create an Airflow DAG for Real-Time ETL:

Here’s how you can create a DAG that listens for file events from your S3 bucket and processes them through Snowflake in real-time.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
def process_data_from_s3_to_snowflake():
# Logic for loading data from S3 to Snowflake using Snowpipe
pass
dag = DAG('real_time_etl', start_date=datetime(2023, 1, 1), catchup=False)
process_task = PythonOperator(task_id='process_data', python_callable=process_data_from_s3_to_snowflake, dag=dag)
  1. Real-Time Trigger with Airflow:
    Airflow can trigger tasks when a new file is uploaded to S3. You can use S3FileSensor to wait for a file to appear:
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
file_sensor_task = S3KeySensor(
task_id="wait_for_new_file",
bucket_name="my-s3-bucket",
aws_conn_id="aws_default",
timeout=600,
poke_interval=60,
mode='poke'
)

Step 5: Real-Time Data Processing

This step builds on Snowpipe and Airflow to incorporate real-time data ingestion and continuous ETL pipelines.

Real-Time Processing Example:
Using Apache Kafka or AWS Kinesis for real-time data streaming to Snowflake is another option. Here’s a conceptual flow for integrating Kafka to Snowflake:

  • Kafka Producer sends real-time data to a Kafka topic.
  • Kafka Consumer consumes data and loads it into AWS S3.
  • Snowpipe automatically loads data from S3 into Snowflake in real time.

Step 6: Setting up CI/CD Pipelines

In the CI/CD process, we continue using GitHub Actions to deploy changes to AWS, Airflow DAGs, and Terraform scripts.

6.1 Version Control and CI/CD Automation

In the CI/CD process, we continue using GitHub Actions to deploy changes to AWS, Airflow DAGs, and Terraform scripts.

  1. GitHub Actions for Terraform: Automate the deployment of infrastructure and resources like Snowflake and AWS services using Terraform.
name: Deploy to AWS and Snowflake
on:
push:
branches:
- main
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Terraform
uses: hashicorp/setup-terraform@v1
- name: Terraform Init and Apply
run: |
terraform init
terraform apply -auto-approve

Step 7: Monitoring and Alerts for Airflow and Snowflake

Set up email alerts for failed tasks in Airflow and configure Snowflake’s task feature to monitor data loading and trigger alerts.

7.1 Airflow Monitoring

  1. Airflow UI:
    You can monitor your DAG runs and task statuses through the Airflow UI. Each task shows if it’s running, succeeded, or failed.
  2. Airflow Alerts:
    Set up email alerts for failed tasks using the EmailOperator or SlackWebhookOperator for notifications.
from airflow.operators.email_operator import EmailOperator
def send_alert_email():
# Email alert logic
Pass

Task Monitoring:
Use Snowflake Tasks to schedule and monitor regular ETL jobs. Create a task that monitors Snowpipe data loads and sends alerts if it fails.

Setting Up Snowflake Alerts:
Configure alerts within Snowflake using the Alerts feature for real-time error handling.

Step 8: Advanced Snowflake Features

This section will cover advanced features in Snowflake such as Data Sharing and Clustering to optimize large datasets.

8.1 Data Sharing in Snowflake

Snowflake offers Data Sharing to share data securely between accounts:

  1. Create a Share:
CREATE OR REPLACE SHARE my_share;
  1. Add Objects to the Share:
GRANT USAGE ON DATABASE my_db TO SHARE my_share;
  1. Data Consumer Access: The consumer can then query this shared data as if it were their own.

8.2 Snowflake Clustering

Clustering helps optimize large data sets. You can define a Cluster Key in Snowflake:


CREATE TABLE my_table (
id INT,
name STRING,
data STRING
)
CLUSTER BY (id);

This ensures that data is distributed efficiently for faster queries.

Step 9: Testing and Validation

Once the data is migrated, validate it by running simple queries to check the integrity of the data in Snowflake.

9.1 Data Validation

Once the data is migrated, validate it by running simple queries to check the integrity of the data in Snowflake:

SELECT COUNT(*) FROM my_table;
SELECT * FROM my_table LIMIT 10;

9.2 Real-Time Monitoring Validation

Ensure the real-time processing is working by checking the Airflow Logs, Snowpipe Status, and Task Logs.

Step 10: Conclusion

This guide has covered the entire journey of data migration, real-time processing, and advanced features. Now you have a fully functional PoC suitable for enterprise environments.

This guide has covered the entire journey of data migration, real-time processing, and advanced features. You have learned how to build a production-ready data migration pipeline, automate it with Airflow, and monitor it effectively. By incorporating real-time data ingestion and advanced Snowflake features, you now have a fully functional PoC suitable for enterprise environments.

--

--

Jeeva-AWSLabsJourney
Jeeva-AWSLabsJourney

Written by Jeeva-AWSLabsJourney

Exploring AWS, cloud, Linux & DevOps. Your guide to navigating the digital realm. Join me on the journey of discovery

No responses yet