ETL SQL

Learn Extract Transform Load using SQL & PySpark

AWS Data Engineering - Crash Course

AWS Data Engineering – Crash Course

For the past five years, I’ve been diving deep into AWS almost every single day. In this post, I’m going to take you through my journey as an AWS data engineer, sharing the kind of projects I’ve worked on and the services that have been my go-to tools.

This isn’t going to be your typical “zero to hero” AWS tutorial. Instead, I want to give you a real-world perspective of what it’s like to work as a data engineer in the AWS ecosystem. I’ll be breaking down the kinds of projects you can expect, the services you’ll likely use, and most importantly, I’ll be sharing hands-on experiences that go beyond just theoretical knowledge.

AWS Data Engineering - Crash Course
AWS Data Engineering – Crash Course

Hey everyone! In this post we’ll be focusing primarily on building data lakes and data warehouses – the bread and butter of data engineering. I’ll walk you through the AWS services I’ve used, sharing their pros and cons, and throwing in some practical code examples along the way. With 15 plus years of experience in building data warehouses and data lakes across different industries, I’ve got a lot of insights to share.

If you’re ready to get a genuine, experienced-based look into AWS data engineering, stick with me. Trust me, you’re going to learn a lot from the next few minutes.

Let’s dive in!

Data Warehouse or Data Lake

One of the most important and popular types of projects in my AWS journey has been data warehouse migration. Picture this: an enterprise with an existing data warehouse running on traditional systems wants to move everything to Amazon Redshift.

Let me break down what this typically looks like. These migrations involve converting existing data pipelines and ETL processes to AWS services. All those SQL files that were running in their traditional system? They’ll now run on Amazon Redshift. You’ll schedule these to run at desired frequencies, and sometimes that means doing some serious SQL conversion.

I’ll give you a real-world example. In one project, we were migrating from Teradata to Redshift, and we hit a classic challenge – user-defined functions. The original system had UDFs written in C, which Redshift doesn’t support. Redshift supports UDFs in SQL and Python, so I had to convert those C-language methods into Python-compatible functions. That’s the kind of technical gymnastics you’ll often encounter in migration projects.

If you want to dive deeper into Amazon Redshift, I’ve got a detailed video linked in the description. Check it out to understand its internal architecture and how it works.

When working with Redshift, you’ve got a few key approaches:

– Run SQL commands via schedulers like AWS Managed Airflow

– Create stored procedures for more dynamic parameter handling

I’ve spent almost every day of the last five years working with Redshift, and let me tell you, migration projects are never straightforward. They require a mix of technical skills, problem-solving, and understanding both the source and target systems.

For beginners wanting to understand more, I’ll link some starter resources in the description. Trust me, these migration projects are where you really learn the nitty-gritty of cloud data engineering.

Data Warehouse or Data Lake
Data Warehouse or Data Lake

Let’s talk about another critical project type: creating a data lake in AWS. Enterprises often want to build their own data lakes, and there are typically two main strategies.

First scenario: An enterprise already has a data warehouse and wants to start data lake hydration by pulling historical data directly from that existing warehouse. It’s like taking your current data repository and expanding it into a more comprehensive data lake.

I’ll share a challenging project I worked on. We were dealing with an on-premises SQL Server containing over five petabytes of data. Moving that massive volume of data into an S3 data lake? That’s not a simple task.

We developed a novel approach:

– For cold data, we created backups from SQL Server and moved them as-is

– For hot data, we built a custom PySpark application that could:

  – Connect to the local SQL Server instance

  – Fetch and export data

  – Move data into the S3 data lake

You might wonder, “Why build a custom app when AWS Database Migration Service (DMS) exists?” Great question. We needed custom transformations:

– Changing data types

– Applying specific data modifications

– Creating a user-friendly tool that even non-technical business analysts could use

I personally wrote every line of code and received great appreciation for the solution. I’ll link the AWS public blog about this project in the description for those who want to dive deeper.

When building a data lake, data hydration is your first critical step. Most enterprises start by exporting data from existing databases or data warehouses – essentially transferring their historical data into the new lake infrastructure.

Alright, here’s what we’re going to do in this video. I’ll be walking you through real-world examples, sharing actual code, and showing how we leverage key AWS services to build data infrastructure.

We’ll focus on:

– Amazon Redshift for data mart and warehouse development

– S3 Data Lake creation

– AWS Glue and Amazon EMR for PySpark transformations

– Managed Airflow for scheduling and job orchestration

I know many of you want hands-on learning. So I’ll demonstrate:

– Creating scripts

– Scheduling and executing jobs

– Data loading techniques

Let me be clear – this isn’t a deep-dive masterclass or a “zero to hero” tutorial. This is a practical overview of the kind of projects you can expect in AWS data engineering.

My goal? Give you enough practical insight that if you’re a data engineer with some background, you can log into AWS and start building data pipelines. And for deeper learning, I always recommend the official AWS documentation and public blogs – they’re treasure troves of knowledge.

Later on in the video, we’ll go hands-on with:

– Amazon Redshift

– AWS Glue

– Amazon EMR

– Managed Airflow

These four components form the backbone of most AWS data engineering projects. While we’ll touch on other services like Lambda,EC2, CloudWatch, and SQS, we’ll stay focused on building a data mart, warehouse, and lake.

I’ll share some code to help you see how these services connect and work together. Think of this as a beginner-friendly tutorial to kickstart your AWS data engineering journey.

Ready to dive into the AWS ecosystem? Let’s start with Amazon Redshift!

Amazon Redshift – Overview

Amazon Redshift - Overview
Amazon Redshift – Overview

Amazon Redshift is a fully managed data warehouse – data analytics service that provides efficient scalability & management solution. It is a massively parallel processing engine that is very efficient in sql query execution. Amazon Redshift is a columnar database that is highly optimized for data analytics queries. It provides 4 different distribution styles for efficient storing & retrieval of data. Additionally, the option to have sort keys increases speed of data retrieval significantly. Moreover, with redshift spectrum you can directly read data that resides in S3 buckets results in efficient integration with data lake.

AWS Glue – Overview

AWS Glue - Overview
AWS Glue – Overview

AWS Glue is a serverless AWS service so you don’t have to worry about server/nodes procurement or even the maintenance. AWS Glue is primarily used for building & running Spark ETL jobs. In addition to Spark, you can also run python shell jobs in AWS Glue. Another widely used feature of AWS Glue is Data Catalog which stores metadata information about different tables created in AWS Glue. It is even used for tables built using Hive , Spark on EMR & integrates well with other AWS services like lake formation, Athena, EMR & even redshift as external database.

Moreover , the Glue crawler features helps in discovering underlying data & data structure automatically from the source. Glue studio is the visual interface for creating ETL jobs quickly.

Apache Hudi – Overview

Apache Hudi - Overview
Apache Hudi – Overview

Apache Hudi is an open source data lake format which is pretty popular for data lake creation. It supports ACID transactions so you can run Insert/Update/Delete on Hudi tables. Additionally it provides versioning of table data with the rollback feature. Seamless integration with other aws services like AWS Glue, Athena & even Redshift makes it a very popular choice in AWS ecosystem.

Amazon EMR – Overview

Amazon EMR - Overview
Amazon EMR – Overview

Amazon EMR (Elastic Map-Reduce) is a managed big data platform in AWS. It is highly elastic & cost effective platform to run big data workloads. The seamless integration with other aws services makes it a popular choice for running spark jobs. It is highly configurable & flexible when it comes to choosing the right cluster for big data workloads. The high performance & scalability of EMR makes it a pretty popular choice for big data workloads. The platform is highly secure & compliant with data security guidelines.

Managed Airflow – Overview

Managed Airflow - Overview
Managed Airflow – Overview

Managed Airflow is python based workflow orchestration tool that is used to schedule complex workflows with the AWS ecosystem. It is a managed service so you don’t have to worry about setting up the airflow infrastructure. The seamless integration with other aws services makes it a very popular choice for data management pipelines execution. It is a managed service which is highly scalable, so it can automatic scale for efficient operations. MWAA ensures secure & compliant workflow orchestration. Last but not the least, it is very cost efficient considering it takes away all the operations overhead of managing clusters.

Hands-on exercise building data pipeline

There are 5 exercises mentioned in the video/post & you can download the sample data & code as well for practice purpose.

In the first exercise, I have shown how you can use AWS Glue crawler to parse input file & create a table in Glue Catalog.

In the second exercise, we have used AWS Glue pyspark application to load data from CSV files into datalake hudi tables.

In the third exercise, we have used Amazon EMR to read datalake hudi table and created analytics hudi table. It is like reading silver layer data & transforming into golden layer if you follow medallion architecture.

In the fourth exercise, I have shown how you can read hudi tables directly in Amazon redshift & created snapshots tables to consume & utilize analytics dataset.

Finally in the fifth exercise, we will use Managed airflow to orchestrate and run end to end pipelines covering the steps mentioned earlier.

If you are AWS beginner, I am sure you will learn a lot from this video/post. However this is not aws – zero to hero masterclass. This is more like crash course in which I wanted to share how you can quickly build solutions in AWS using the popular services.

I have referred to following additional videos. Do check these video as well to get better understanding.

Amazon Redshift for beginners: https://youtu.be/dmsuzIOzmIs
AWS DataLake for beginners : https://youtu.be/m-WEGgYq25c

Feel free to reach out to me as well : support@etl-sql.com
If you wish to download the presentation slides , sample data files & source code for AWS Glue job , Amazon EMR pyspark application , Amazon Redshift sql script & Managed Airflow DAG code used in the crash course video then check the link below:
https://mailchi.mp/45b9673b727b/aws-data-engineering-crash-course

Exercise – 1 (Create & run Glue crawler to create glue table)

The first step is to read the data files & see the output of tables created. Refer to the video for more details.

Exercise – 2 (Create & run Glue spark job)

'''
Description: 
Step1: The Glue job will read 3 input data files & create a dataframe for each file- 
		customers
    	products
    	transactions
Step2: Basic string transformations are applied & new load_date column is added
Step3: Write the dataframe into a Hudi table & register in Glue Catalog

This process is equivalent to reading data from input(bronze) layer 
& generate data-lake (silver) data layer

Parameters used in the Glue Job:
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--conf spark.sql.hive.convertMetastoreParquet=false
--datalake-formats hudi
--bucket_name <enter_your_bucket_name>
--ip_path <enter_input_raw_files_path>
--op_path <enter_output_silver_files_path>

Some hardcoded values are used in the code like databasename, tablename.
Feel free to change it as per your requirement.

'''

import sys
from pyspark.sql.functions import *
from pyspark.sql.functions import current_date
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.context import SparkContext

class ETLSQL:
    def __init__(self):
        params = ["bucket_name","ip_path","op_path"]
        args = getResolvedOptions(sys.argv, params)
        
        self.context = GlueContext(SparkContext.getOrCreate())
        self.spark = self.context.spark_session
        self.job = Job(self.context)
        self.bucket_name = args["bucket_name"]
        self.ip_path = args["ip_path"]
        self.op_path = args["op_path"]
    
    def read_raw_data(self, path: str, format: str = "csv") -> DataFrame:
        """Read raw data from S3"""
        return self.spark.read.format(format) \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(path)
    
    def process_customers(self, df: DataFrame) -> DataFrame:
        """Process customer data"""
        return df.withColumn("email", lower(col("email"))) \
            .withColumn("country", upper(col("country"))) \
            .withColumn("load_date", current_date())
    
    def process_products(self, df: DataFrame) -> DataFrame:
        """Process product data"""
        return df.withColumn("category", initcap(col("category"))) \
            .withColumn("subcategory", initcap(col("subcategory"))) \
            .withColumn("load_date", current_date())
    
    def process_transactions(self, df: DataFrame) -> DataFrame:
        """Process transactions data"""
        return df.withColumn("load_date", current_date())
    
    def write_to_hudi(self, df: DataFrame, table_name: str, key_field: str):
        """Write DataFrame to Hudi table"""
        hudi_options = {
            'hoodie.table.name': table_name,
            "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
            'hoodie.datasource.write.recordkey.field': key_field,
            'hoodie.datasource.write.precombine.field': 'load_date',
            'hoodie.datasource.write.operation': 'upsert',
            "hoodie.datasource.write.partitionpath.field": "load_date",
            "hoodie.parquet.compression.codec": "gzip",
            "hoodie.datasource.write.hive_style_partitioning": "true",
            'hoodie.datasource.hive_sync.enable': 'true',
            'hoodie.datasource.hive_sync.database': 'db_etl_sql',
            'hoodie.datasource.hive_sync.table': table_name,
            'hoodie.datasource.hive_sync.partition_fields': 'load_date',
            "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
            "hoodie.datasource.hive_sync.use_jdbc": "false",
            "hoodie.datasource.hive_sync.mode": "hms"
        }
        
        df.write.format('hudi') \
            .options(**hudi_options) \
            .mode('append') \
            .save( f's3://{self.bucket_name}/{self.op_path}/data-lake/{table_name}')

    def run(self):
        # Read raw data
        customers_df = self.read_raw_data(f"s3://{self.bucket_name}/{self.ip_path}/customers/")
        products_df = self.read_raw_data(f"s3://{self.bucket_name}/{self.ip_path}/products/")
        transactions_df = self.read_raw_data(f"s3://{self.bucket_name}/{self.ip_path}/transactions/")

        # Process data
        processed_customers = self.process_customers(customers_df)
        processed_products = self.process_products(products_df)
        processed_transactions = self.process_transactions(transactions_df)
        
        # Write to Hudi tables
        self.write_to_hudi(processed_customers, "customers", "customer_id")
        self.write_to_hudi(processed_products, "products", "product_id")
        self.write_to_hudi(processed_transactions, "transactions", "transaction_id")

        self.job.commit()

if __name__ == "__main__":
    etl = ETLSQL()
    etl.run()

Exercise – 3 (Create & run EMR Pyspark job to create glue tables)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import count, sum, avg, min, max, col, countDistinct
import sys

""" 
Description:
The EMR job will read data from "data-lake" layer (SILVER) which is generated in the previous step by Glue job & generate "analytics" layer (GOLD).

Step1: Read 3 Hudi tables from "data-lake" path: customers, products & transactions & generate dataframe for each
Step2: Apply transformation to calculate customer_metrics
Step3: Apply transformation to calculate product analytics
Step4: Add new column process_date & write the 2 aggregate metrics into 2 new hudi tables in "analytics" path

Input Parameters:
        a) S3 bucket name
        b) S3 output path for Hudi tables

Script usage: 

spark-submit --deploy-mode cluster 
        --jars /usr/lib/hudi/hudi-spark-bundle.jar --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" 
        --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" 
        --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
        EMR_job.py <s3_bucket_name> <s3_op_path_for_hudi_table> 

There are some hardcoded values used (like data-lake, analytics path) & you can parameterise & customise it as per your requirement.
"""

class DataLakeProcessor:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("DataLakeProcessor") \
            .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.sql.hive.convertMetastoreParquet', 'false') \
            .config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
            .enableHiveSupport() \
            .getOrCreate()
        self.bucket_name = sys.argv[1]
        self.op_folder = sys.argv[2]
    
    def read_hudi_table(self, table_name: str) -> DataFrame:
        """Read data from Hudi table"""
        return self.spark.read.format('org.apache.hudi') \
            .load(f's3://{self.bucket_name}/{self.op_folder}/data-lake/{table_name}/*')
    
    def calculate_customer_metrics(self) -> DataFrame:
        """Calculate customer purchase metrics"""
        transactions = self.read_hudi_table("transactions")
        
        return transactions.groupBy("customer_id").agg(
                count("transaction_id").alias("total_transactions"),
                sum(col("unit_price") * col("quantity")).alias("total_spent"),
                avg(col("unit_price") * col("quantity")).alias("avg_transaction_amount"),
                min("transaction_date").alias("first_purchase"),
                max("transaction_date").alias("last_purchase")
            )
    
    def create_product_analytics(self) -> DataFrame:
        """Create product analytics"""
        transactions = self.read_hudi_table("transactions")
        products = self.read_hudi_table("products")
        
        return transactions.join(products, "product_id") \
            .groupBy("category", "subcategory") \
            .agg(
                sum("quantity").alias("total_units_sold"),
                sum(col("unit_price") * col("quantity")).alias("total_revenue"),
                avg("unit_price").alias("avg_price"),
                countDistinct("customer_id").alias("unique_customers")
            )
    
    def write_analytics_to_hudi(self, df: DataFrame, table_name: str, key_fields: List[str]):
        """Write analytics results to Hudi"""
        hudi_options = {
            'hoodie.table.name': table_name,
            "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
            'hoodie.datasource.write.recordkey.field': ','.join(key_fields),
            'hoodie.datasource.write.precombine.field': 'process_date',
            "hoodie.datasource.write.partitionpath.field": "process_date",
            "hoodie.parquet.compression.codec": "gzip",
            "hoodie.datasource.write.hive_style_partitioning": "true",
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.datasource.hive_sync.enable': 'true',
            'hoodie.datasource.hive_sync.database': 'db_etl_sql',
            'hoodie.datasource.hive_sync.table': table_name,
            'hoodie.datasource.hive_sync.partition_fields': 'process_date',
            "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
            "hoodie.datasource.hive_sync.use_jdbc": "false",
            "hoodie.datasource.hive_sync.mode": "hms"
        }

        df.withColumn("process_date", current_date()) \
            .write.format('org.apache.hudi') \
            .options(**hudi_options) \
            .mode('append') \
            .save(f's3://{self.bucket_name}/{self.op_folder}/analytics/{table_name}')

    def run(self):
        # Calculate metrics
        customer_metrics_df = self.calculate_customer_metrics()
        product_analytics_df = self.create_product_analytics()
        
        # Write results
        self.write_analytics_to_hudi(
            customer_metrics_df,
            "customer_metrics",
            ["customer_id"]
        )
        self.write_analytics_to_hudi(
            product_analytics_df,
            "product_analytics",
            ["category", "subcategory"]
        )

if __name__ == "__main__":
    processor = DataLakeProcessor()
    processor.run()

Exercise – 4 (Create & run redshift sql job reading Glue tables)

/*
The redshift SQL job will create 5 snapshot tables by reading 2 tables from analytics (GOLD) layer which was generated in the last step by EMR job.
Create an external schema pointing to the glue catalog first.

Kindly replace your IAM Role ARN in the sql command below

Some hardcoded values are used like schema name, databasename, tablename. You can parameterize or customise it as per your requirement.
*/


CREATE EXTERNAL SCHEMA db_etl_sql_ext
FROM DATA CATALOG 
DATABASE 'db_etl_sql' 
IAM_ROLE '<IAM_ROLE_ARN>'
CREATE EXTERNAL DATABASE IF NOT EXISTS;


select * from db_etl_sql_ext.customers;

create schema sch_reporting;

CREATE TABLE sch_reporting.customer_segment_value (
    customer_segment character varying(12) ENCODE zstd distkey,
    customer_count bigint ENCODE zstd,
    avg_spent double precision ENCODE zstd,
    avg_transactions bigint ENCODE zstd,
    spst_date date default CURRENT_DATE
)
DISTSTYLE KEY;

CREATE TABLE sch_reporting.customer_activity_analysis (
    total_customers bigint ENCODE zstd,
    active_90_days bigint ENCODE zstd,
    inactive_90_days bigint ENCODE zstd,
    avg_customer_lifespan bigint ENCODE zstd,
    spst_date date default CURRENT_DATE
)
DISTSTYLE EVEN;

CREATE TABLE sch_reporting.category_performance_analysis  (
    category character varying(100) ENCODE zstd distkey,
    category_revenue double precision ENCODE zstd,
    category_units bigint ENCODE zstd,
    total_customers bigint ENCODE zstd,
    category_avg_price double precision ENCODE zstd,
    revenue_per_customer double precision ENCODE zstd,
    spst_date date default CURRENT_DATE
)
DISTSTYLE KEY;



CREATE TABLE sch_reporting.top_performing_subcategories  (
    category character varying(100) ENCODE zstd distkey,
    subcategory character varying(100) ENCODE zstd,
    total_revenue double precision ENCODE zstd,
    total_units_sold bigint ENCODE zstd,
    category_revenue_share double precision ENCODE zstd,
    spst_date date default CURRENT_DATE
)
DISTSTYLE KEY;




CREATE TABLE sch_reporting.customer_purchase_frequency  (
    frequency_segment character varying(29) ENCODE zstd distkey,
    customer_count bigint ENCODE zstd,
    avg_total_spent double precision ENCODE zstd,
    avg_transaction_value double precision ENCODE zstd,
    spst_date date default CURRENT_DATE
)
DISTSTYLE KEY;



INSERT INTO sch_reporting.customer_segment_value
--customer_segment_value
WITH customer_segments AS (
    SELECT 
        CASE 
            WHEN total_spent >= 1000 THEN 'High Value'
            WHEN total_spent >= 500 THEN 'Medium Value'
            ELSE 'Low Value'
        END as customer_segment,
        COUNT(*) as customer_count,
        AVG(total_spent) as avg_spent,
        AVG(total_transactions) as avg_transactions
    FROM db_etl_sql_ext.customer_metrics
    GROUP BY 
        CASE 
            WHEN total_spent >= 1000 THEN 'High Value'
            WHEN total_spent >= 500 THEN 'Medium Value'
            ELSE 'Low Value'
        END
)
SELECT * FROM customer_segments
ORDER BY avg_spent DESC;


/* this timestamp conversion is required as timestamp is by default stored in number format in Hudi tables */

INSERT INTO sch_reporting.customer_activity_analysis
--customer_activity_analysis
SELECT 
    COUNT(*) as total_customers,
    COUNT(CASE WHEN DATEDIFF('day',TIMESTAMP 'epoch' + last_purchase / 1000000 * INTERVAL '1 second' , CURRENT_DATE) <= 90 THEN 1 END) as active_90_days,
    COUNT(CASE WHEN DATEDIFF('day', TIMESTAMP 'epoch' + last_purchase / 1000000 * INTERVAL '1 second', CURRENT_DATE) > 90 THEN 1 END) as inactive_90_days,
    AVG(DATEDIFF('day', TIMESTAMP 'epoch' + first_purchase / 1000000 * INTERVAL '1 second', TIMESTAMP 'epoch' + last_purchase / 1000000 * INTERVAL '1 second')) as avg_customer_lifespan
FROM db_etl_sql_ext.customer_metrics;


INSERT INTO  sch_reporting.category_performance_analysis
--category_performance_analysis
SELECT 
    category,
    SUM(total_revenue) as category_revenue,
    SUM(total_units_sold) as category_units,
    SUM(unique_customers) as total_customers,
    AVG(avg_price) as category_avg_price,
    SUM(total_revenue)/SUM(unique_customers) as revenue_per_customer
FROM db_etl_sql_ext.product_analytics
GROUP BY category
ORDER BY category_revenue DESC;


INSERT INTO sch_reporting.top_performing_subcategories
--top_performing_subcategories
SELECT 
    category,
    subcategory,
    total_revenue,
    total_units_sold,
    ROUND((total_revenue * 100.0 / SUM(total_revenue) OVER (PARTITION BY category)), 2) as category_revenue_share
FROM db_etl_sql_ext.product_analytics
ORDER BY total_revenue DESC;


INSERT INTO  sch_reporting.customer_purchase_frequency
--customer_purchase_frequency
SELECT 
    CASE 
        WHEN total_transactions >= 10 THEN 'Frequent (10+ transactions)'
        WHEN total_transactions >= 5 THEN 'Regular (5-9 transactions)'
        ELSE 'Occasional (1-4 transactions)'
    END as frequency_segment,
    COUNT(*) as customer_count,
    AVG(total_spent) as avg_total_spent,
    AVG(avg_transaction_amount) as avg_transaction_value
FROM db_etl_sql_ext.customer_metrics
GROUP BY 
    CASE 
        WHEN total_transactions >= 10 THEN 'Frequent (10+ transactions)'
        WHEN total_transactions >= 5 THEN 'Regular (5-9 transactions)'
        ELSE 'Occasional (1-4 transactions)'
    END;

Exercise – 5 (Create MWAA Dag & call Glue , EMR & Redshift job)

from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
"""
This is airflow DAG which will run Glue job first to read raw data file & generate 3 hudi tables  in data-lake path.
Then it will trigger EMR job which will read 3 hudi tables loaded in previous step to generate 2 hudi tables in analytics path.
Then it will call Redshift Stored Procedure to generate snapshot tables for reporting purpose.

Kindly replace the string values which are enclosed in <> before executing the job.
Moreover create a redshift connection in the Airflow as well
"""
class EtlSqlPipeline:
    def __init__(self):
        self.default_args = {
            'owner': 'etl-sql',
            'depends_on_past': False,
            'start_date': datetime(2025, 1, 1),
            'retries': 1,
            'retry_delay': timedelta(minutes=5)
        }
        self.RS_CONN_ID = '<enter Redshift connection name>'
        self.EMR_cluster_id = '<enter EMR Cluster ID>'

        
        self.dag = DAG(
            dag_id='etl_sql_demo_pipeline',
            default_args=self.default_args,
            schedule_interval=None,
            description="Demo etl-sql pipeline that runs Glue, EMR & Redshift loads",
            catchup=False,
            tags=["etl-sql","demo"],
        )
        
        self.create_tasks()
    
    def create_tasks(self):
        # Start
        start_pipeline = DummyOperator(task_id="LetsGo")

        # Glue ETL Job
        Step1_GlueJob = GlueJobOperator(
            task_id='Step1_GlueJob',
            job_name='<enter glue job name>',
            script_args={
                "--bucket_name": "<enter s3 bucket name>",
                "--ip_path": "<enter input raw data files path>",
                "--op_path": "<enter output hudi table path>",
            },
            dag=self.dag
        )
        
        # EMR Analytics Job
        emr_steps = [{
            'Name': 'Generate_Gold_Layer',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    "spark-submit",
                    "--deploy-mode", "cluster", 
                    "--jars", "/usr/lib/hudi/hudi-spark-bundle.jar",
                    "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer",
                    "--conf", "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" ,
                    "--conf", "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" ,
                    "<enter S3 bucket path for EMR job>",
                    "<enter output S3 bucket name>",
                    "<enter output hudi table S3 path>"
                ]
            }
        }]
        
        Step2_EmrStart = EmrAddStepsOperator(
            task_id='Step2_EmrStart',
            job_flow_id=self.EMR_cluster_id,
            steps=emr_steps,
            dag=self.dag
        )
        
        Step3_EmrWatcher = EmrStepSensor(
            task_id='Step3_EmrWatcher',
            job_flow_id=self.EMR_cluster_id,
            step_id='{{ task_instance.xcom_pull("Step2_EmrStart", key="return_value")[0] }}',
            dag=self.dag
        )
        
        # Redshift Load Tasks
        Step4_RedshiftSpst = PostgresOperator(
            task_id='Step4_RedshiftSpst',
            sql='call sch_reporting.load_snapshot_data()',
            postgres_conn_id=self.RS_CONN_ID,
            dag=self.dag
        )
        
        #End
        end_pipeline = DummyOperator(task_id="YouWon")
        
        # Set dependencies
        start_pipeline >> Step1_GlueJob >> Step2_EmrStart >> Step3_EmrWatcher >> Step4_RedshiftSpst >> end_pipeline

# Create pipeline
pipeline = EtlSqlPipeline()
dag = pipeline.dag

Hope this crash course helps you in quickly starting in AWS Data Engineering. If you need any support on AWS on these services, feel free to reach out to me via email : support@etl-sql.com