ETL SQL

Learn Extract Transform Load using SQL & PySpark

PySpark OOP – must know for people switching from SQL background

Many people who switch from SQL background to PySpark they get into the habit of writing PySpark code in sequential manner. It is because they are used to writing such code in SQL where one sql query follows the other.

But in PySpark , they should not ignore the Object Oriented Programming style. In this video I have shared a very easy way to switch from writing pyspark code in sequential manner into functional & finally into OOP style.

For demonstration purpose, I will create a ETL pipeline in AWS Glue using PySpark that will read data from 2 csv files. After applying transformations like joins & aggregations it will write data back into S3 in parquet format.

Watch the video now

The PySpark code for easy reference:

Sequential job

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import sum, col, when

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


# Read the first CSV file - sales data
sales_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("s3://<s3_bucket>/source/sales_data.csv")

# Read the second CSV file - product data
products_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("s3://<s3_bucket>/source/product_data.csv")

# Join the dataframes
joined_df = sales_df.join(
    products_df,
    sales_df.product_id == products_df.id,
    "inner"
)

# Perform aggregation
result_df = joined_df.groupBy("category") \
    .agg(
        sum(col("quantity") * col("price")).alias("total_sales"),
        sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count")
    )

# Write results to parquet
result_df.write \
    .mode("overwrite") \
    .parquet("s3://<s3_bucket>/target/sales_analysis/sequential/")

print("sequential Process completed successfully!")


job.commit()

Functional Job

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import sum, col, when

# Functional style - organizing code into functions

def create_spark_session(app_name):
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    return spark

def read_csv_data(spark, path):
    return spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(path)

def join_dataframes(sales_df, products_df):
    return sales_df.join(
        products_df,
        sales_df.product_id == products_df.id,
        "inner"
    )

def aggregate_sales_data(joined_df):
    return joined_df.groupBy("category") \
        .agg(
            sum(col("quantity") * col("price")).alias("total_sales"),
            sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count")
        )

def write_parquet_data(df, output_path):
    df.write \
        .mode("overwrite") \
        .parquet(output_path)

def main():
    # Initialize Spark
    spark = create_spark_session("Functional PySpark Example")
    # Read data
    sales_df = read_csv_data(spark, "s3://<s3_bucket>/source/sales_data.csv")
    products_df = read_csv_data(spark, "s3://<s3_bucket>/source/product_data.csv")
    # Process data
    joined_df = join_dataframes(sales_df, products_df)
    result_df = aggregate_sales_data(joined_df)
    # Write results
    write_parquet_data(result_df, "s3://<s3_bucket>/target/sales_analysis/functional/")
    print("Process completed successfully!")
    

if __name__ == "__main__":
    main()

Object Oriented Programming Job

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import sum, col, when

# Object-Oriented style - using classes to organize code

class SalesAnalyzer:
    def __init__(self, app_name="OOP PySpark Example"):
        self.spark = self._create_spark_session(app_name)
    
    def _create_spark_session(self, app_name):
        ## @params: [JOB_NAME]
        args = getResolvedOptions(sys.argv, ['JOB_NAME'])
        sc = SparkContext()
        glueContext = GlueContext(sc)
        spark = glueContext.spark_session
        job = Job(glueContext)
        job.init(args['JOB_NAME'], args)
        return spark
    
    def read_csv_data(self, path):
        return self.spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(path)
    
    def join_dataframes(self, sales_df, products_df):
        return sales_df.join(
            products_df,
            sales_df.product_id == products_df.id,
            "inner"
        )
    
    def aggregate_sales_data(self, joined_df):
        return joined_df.groupBy("category") \
            .agg(
                sum(col("quantity") * col("price")).alias("total_sales"),
                sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count")
            )
    
    def write_parquet_data(self, df, output_path):
        df.write \
            .mode("overwrite") \
            .parquet(output_path)
    
    def process_sales_data(self, sales_path, products_path, output_path):
        # Read data
        sales_df = self.read_csv_data(sales_path)
        products_df = self.read_csv_data(products_path)
        
        # Process data
        joined_df = self.join_dataframes(sales_df, products_df)
        result_df = self.aggregate_sales_data(joined_df)
        
        # Write results
        self.write_parquet_data(result_df, output_path)
        
        return "Process completed successfully!"

def main():
    # Create analyzer instance
    analyzer = SalesAnalyzer()
    
    # Process data
    result = analyzer.process_sales_data(
        "s3://<s3_bucket>/source/sales_data.csv",
        "s3://<s3_bucket>/source/product_data.csv",
        "s3://<s3_bucket>/target/sales_analysis/oop/"
    )
    
    print(result)

if __name__ == "__main__":
    main()
    

Leave a Reply

Your email address will not be published. Required fields are marked *