Many people who switch from SQL background to PySpark they get into the habit of writing PySpark code in sequential manner. It is because they are used to writing such code in SQL where one sql query follows the other.

But in PySpark , they should not ignore the Object Oriented Programming style. In this video I have shared a very easy way to switch from writing pyspark code in sequential manner into functional & finally into OOP style.
For demonstration purpose, I will create a ETL pipeline in AWS Glue using PySpark that will read data from 2 csv files. After applying transformations like joins & aggregations it will write data back into S3 in parquet format.

Watch the video now
The PySpark code for easy reference:
Sequential job
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import sum, col, when
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Read the first CSV file - sales data
sales_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("s3://<s3_bucket>/source/sales_data.csv")
# Read the second CSV file - product data
products_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("s3://<s3_bucket>/source/product_data.csv")
# Join the dataframes
joined_df = sales_df.join(
products_df,
sales_df.product_id == products_df.id,
"inner"
)
# Perform aggregation
result_df = joined_df.groupBy("category") \
.agg(
sum(col("quantity") * col("price")).alias("total_sales"),
sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count")
)
# Write results to parquet
result_df.write \
.mode("overwrite") \
.parquet("s3://<s3_bucket>/target/sales_analysis/sequential/")
print("sequential Process completed successfully!")
job.commit()Functional Job
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import sum, col, when
# Functional style - organizing code into functions
def create_spark_session(app_name):
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
return spark
def read_csv_data(spark, path):
return spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(path)
def join_dataframes(sales_df, products_df):
return sales_df.join(
products_df,
sales_df.product_id == products_df.id,
"inner"
)
def aggregate_sales_data(joined_df):
return joined_df.groupBy("category") \
.agg(
sum(col("quantity") * col("price")).alias("total_sales"),
sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count")
)
def write_parquet_data(df, output_path):
df.write \
.mode("overwrite") \
.parquet(output_path)
def main():
# Initialize Spark
spark = create_spark_session("Functional PySpark Example")
# Read data
sales_df = read_csv_data(spark, "s3://<s3_bucket>/source/sales_data.csv")
products_df = read_csv_data(spark, "s3://<s3_bucket>/source/product_data.csv")
# Process data
joined_df = join_dataframes(sales_df, products_df)
result_df = aggregate_sales_data(joined_df)
# Write results
write_parquet_data(result_df, "s3://<s3_bucket>/target/sales_analysis/functional/")
print("Process completed successfully!")
if __name__ == "__main__":
main()
Object Oriented Programming Job
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import sum, col, when
# Object-Oriented style - using classes to organize code
class SalesAnalyzer:
def __init__(self, app_name="OOP PySpark Example"):
self.spark = self._create_spark_session(app_name)
def _create_spark_session(self, app_name):
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
return spark
def read_csv_data(self, path):
return self.spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(path)
def join_dataframes(self, sales_df, products_df):
return sales_df.join(
products_df,
sales_df.product_id == products_df.id,
"inner"
)
def aggregate_sales_data(self, joined_df):
return joined_df.groupBy("category") \
.agg(
sum(col("quantity") * col("price")).alias("total_sales"),
sum(when(col("discount") > 0, 1).otherwise(0)).alias("discounted_sales_count")
)
def write_parquet_data(self, df, output_path):
df.write \
.mode("overwrite") \
.parquet(output_path)
def process_sales_data(self, sales_path, products_path, output_path):
# Read data
sales_df = self.read_csv_data(sales_path)
products_df = self.read_csv_data(products_path)
# Process data
joined_df = self.join_dataframes(sales_df, products_df)
result_df = self.aggregate_sales_data(joined_df)
# Write results
self.write_parquet_data(result_df, output_path)
return "Process completed successfully!"
def main():
# Create analyzer instance
analyzer = SalesAnalyzer()
# Process data
result = analyzer.process_sales_data(
"s3://<s3_bucket>/source/sales_data.csv",
"s3://<s3_bucket>/source/product_data.csv",
"s3://<s3_bucket>/target/sales_analysis/oop/"
)
print(result)
if __name__ == "__main__":
main()
Leave a Reply