Many people who switch from SQL background to PySpark they get into the habit of writing PySpark code in sequential manner. It is because they are used to writing such code in SQL where one sql query follows the other.

But in PySpark , they should not ignore the Object Oriented Programming style. In this video I have shared a very easy way to switch from writing pyspark code in sequential manner into functional & finally into OOP style.
For demonstration purpose, I will create a ETL pipeline in AWS Glue using PySpark that will read data from 2 csv files. After applying transformations like joins & aggregations it will write data back into S3 in parquet format.

Watch the video now
The PySpark code for easy reference:
Sequential job
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import sum, col, when ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Read the first CSV file - sales data sales_df = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("s3://<s3_bucket>/source/sales_data.csv") # Read the second CSV file - product data products_df = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("s3://<s3_bucket>/source/product_data.csv") # Join the dataframes joined_df = sales_df.join( products_df, sales_df.product_id == products_df.id, "inner" ) # Perform aggregation result_df = joined_df.groupBy("category") \ .agg( sum(col("quantity") * col("price")).alias("total_sales"), sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count") ) # Write results to parquet result_df.write \ .mode("overwrite") \ .parquet("s3://<s3_bucket>/target/sales_analysis/sequential/") print("sequential Process completed successfully!") job.commit()
Functional Job
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import sum, col, when # Functional style - organizing code into functions def create_spark_session(app_name): ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) return spark def read_csv_data(spark, path): return spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(path) def join_dataframes(sales_df, products_df): return sales_df.join( products_df, sales_df.product_id == products_df.id, "inner" ) def aggregate_sales_data(joined_df): return joined_df.groupBy("category") \ .agg( sum(col("quantity") * col("price")).alias("total_sales"), sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count") ) def write_parquet_data(df, output_path): df.write \ .mode("overwrite") \ .parquet(output_path) def main(): # Initialize Spark spark = create_spark_session("Functional PySpark Example") # Read data sales_df = read_csv_data(spark, "s3://<s3_bucket>/source/sales_data.csv") products_df = read_csv_data(spark, "s3://<s3_bucket>/source/product_data.csv") # Process data joined_df = join_dataframes(sales_df, products_df) result_df = aggregate_sales_data(joined_df) # Write results write_parquet_data(result_df, "s3://<s3_bucket>/target/sales_analysis/functional/") print("Process completed successfully!") if __name__ == "__main__": main()
Object Oriented Programming Job
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import sum, col, when # Object-Oriented style - using classes to organize code class SalesAnalyzer: def __init__(self, app_name="OOP PySpark Example"): self.spark = self._create_spark_session(app_name) def _create_spark_session(self, app_name): ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) return spark def read_csv_data(self, path): return self.spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(path) def join_dataframes(self, sales_df, products_df): return sales_df.join( products_df, sales_df.product_id == products_df.id, "inner" ) def aggregate_sales_data(self, joined_df): return joined_df.groupBy("category") \ .agg( sum(col("quantity") * col("price")).alias("total_sales"), sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count") ) def write_parquet_data(self, df, output_path): df.write \ .mode("overwrite") \ .parquet(output_path) def process_sales_data(self, sales_path, products_path, output_path): # Read data sales_df = self.read_csv_data(sales_path) products_df = self.read_csv_data(products_path) # Process data joined_df = self.join_dataframes(sales_df, products_df) result_df = self.aggregate_sales_data(joined_df) # Write results self.write_parquet_data(result_df, output_path) return "Process completed successfully!" def main(): # Create analyzer instance analyzer = SalesAnalyzer() # Process data result = analyzer.process_sales_data( "s3://<s3_bucket>/source/sales_data.csv", "s3://<s3_bucket>/source/product_data.csv", "s3://<s3_bucket>/target/sales_analysis/oop/" ) print(result) if __name__ == "__main__": main()
Leave a Reply