110+ PySpark Practice Exercises Interview Questions for Beginners and Experienced
PySpark is a powerful framework that enables you to process and analyze large-scale data using Apache Spark with Python. It offers a wide range of transformations and actions to manipulate data efficiently. In this article, we will provide you with 110 practical PySpark exercises for beginners to help you learn PySpark and enhance your data processing skills. Each exercise is accompanied by a step-by-step guide and a practical question to solve. Let's get started!
Exercise 1: Loading Data
Question: Load a CSV file named "data.csv" into a PySpark DataFrame.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("Exercise 1").getOrCreate()
# Load data from CSV into a DataFrame
data_df = spark.read.csv("data.csv", header=True, inferSchema=True)
# Show the first few rows of the DataFrame
data_df.show()
Answer: The provided code will create a SparkSession, read the CSV file "data.csv" into a PySpark DataFrame, and display the first few rows of the DataFrame.
Exercise 2: Data Exploration
Question: Display the schema of the DataFrame loaded in the previous exercise.
# Display the schema of the DataFrame
data_df.printSchema()
Answer: The above code will display the schema of the DataFrame, showing the names and data types of its columns.
Exercise 3: Data Filtering
Question: Filter the DataFrame to include only rows where the "age" column is greater than 30.
from pyspark.sql.functions import col
# Filter the DataFrame
filtered_df = data_df.filter(col("age") > 30)
# Show the filtered DataFrame
filtered_df.show()
Answer: The provided code will filter the DataFrame to include only rows where the "age" column is greater than 30 and display the filtered DataFrame.
Exercise 4: Data Transformation
Question: Create a new column named "double_age" that contains twice the value of the "age" column.
from pyspark.sql.functions import expr
# Create the new column
transformed_df = data_df.withColumn("double_age", expr("age * 2"))
# Show the DataFrame with the new column
transformed_df.show()
Answer: The given code will create a new column "double_age" in the DataFrame, containing twice the value of the "age" column, and display the DataFrame with the new column.
Exercise 5: Data Aggregation
Question: Calculate the average age of the individuals in the DataFrame.
from pyspark.sql.functions import avg
# Calculate the average age
avg_age = data_df.select(avg("age")).first()[0]
print("Average Age:", avg_age)
Answer: The provided code will calculate the average age of the individuals in the DataFrame and print the result.
Exercise 6: Data Grouping
Question: Group the DataFrame by the "gender" column and calculate the average age within each group.
# Group by "gender" and calculate the average age
grouped_df = data_df.groupBy("gender").agg(avg("age").alias("average_age"))
# Show the grouped DataFrame
grouped_df.show()
Answer: The given code will group the DataFrame by the "gender" column and calculate the average age within each group, displaying the result.
Exercise 7: Joining DataFrames
Question: Load two CSV files "employees.csv" and "departments.csv" into separate DataFrames. Join the DataFrames on the "department_id" column to get a single DataFrame containing both employee and department information.
# Load employees data from CSV into DataFrame
employees_df = spark.read.csv("employees.csv", header=True, inferSchema=True)
# Load departments data from CSV into DataFrame
departments_df = spark.read.csv("departments.csv", header=True, inferSchema=True)
# Join the DataFrames on "department_id"
joined_df = employees_df.join(departments_df, "department_id")
# Show the joined DataFrame
joined_df.show()
Answer: The provided code will load data from two CSV files "employees.csv" and "departments.csv" into separate DataFrames and then perform an inner join on the "department_id" column to combine employee and department information in a single DataFrame, displaying the result.
Exercise 8: Data Deduplication
Question: Remove duplicate rows from the DataFrame and show the deduplicated result.
# Deduplicate the DataFrame
deduplicated_df = joined_df.dropDuplicates()
# Show the deduplicated DataFrame
deduplicated_df.show()
Answer: The given code will remove duplicate rows from the DataFrame and display the deduplicated result.
Exercise 9: Data Imputation
Question: Replace the missing values in the "salary" column with the average salary.
from pyspark.sql.functions import avg
# Calculate the average salary
avg_salary = joined_df.select(avg("salary")).first()[0]
# Replace missing values with average salary
imputed_df = joined_df.na.fill(avg_salary, subset=["salary"])
# Show the DataFrame with imputed values
imputed_df.show()
Answer: The provided code will calculate the average salary from the DataFrame and then replace the missing values in the "salary" column with the calculated average, resulting in a DataFrame with imputed values.
Exercise 10: Data Aggregation with GroupBy
Question: Group the DataFrame by the "department_name" column and calculate the total salary for each department.
# Group by "department_name" and calculate the total salary
salary_by_department_df = joined_df.groupBy("department_name").agg(sum("salary").alias("total_salary"))
# Show the DataFrame with total salary for each department
salary_by_department_df.show()
Answer: The provided code will group the DataFrame by the "department_name" column and calculate the total salary for each department, displaying the result.
Exercise 11: Data Sorting
Question: Sort the DataFrame based on the "age" column in ascending order.
# Sort the DataFrame by "age" in ascending order
sorted_df = joined_df.orderBy("age")
# Show the sorted DataFrame
sorted_df.show()
Answer: The given code will sort the DataFrame based on the "age" column in ascending order and display the sorted DataFrame.
Exercise 12: Data Joining with Different Column Names
Question: Load two CSV files "employees.csv" and "departments.csv" into separate DataFrames. Join the DataFrames on "dept_id" from the "employees" DataFrame and "id" from the "departments" DataFrame to get a single DataFrame containing both employee and department information.
# Load employees data from CSV into DataFrame with custom column names
employees_df = spark.read.csv("employees.csv", header=True, inferSchema=True).withColumnRenamed("dept_id", "department_id")
# Load departments data from CSV into DataFrame
departments_df = spark.read.csv("departments.csv", header=True, inferSchema=True)
# Join the DataFrames on different column names
joined_df = employees_df.join(departments_df, employees_df["department_id"] == departments_df["id"])
# Show the joined DataFrame
joined_df.show()
Answer: The provided code will load data from two CSV files "employees.csv" and "departments.csv" into separate DataFrames with custom column names. Then, it will perform an inner join on the "department_id" column from the "employees" DataFrame and the "id" column from the "departments" DataFrame to combine employee and department information in a single DataFrame, displaying the result.
Exercise 13: Data Repartitioning
Question: Repartition the DataFrame into 5 partitions for better parallelism during processing.
# Repartition the DataFrame into 5 partitions
repartitioned_df = joined_df.repartition(5)
# Show the repartitioned DataFrame
repartitioned_df.show()
Answer: The provided code will repartition the DataFrame into 5 partitions, allowing for better parallelism during processing, and display the repartitioned DataFrame.
Exercise 14: Data Window Functions
Question: Calculate the rank of employees based on their salaries within each department.
from pyspark.sql import Window
from pyspark.sql.functions import rank
# Define the window specification
window_spec = Window.partitionBy("department_name").orderBy(joined_df["salary"].desc())
# Calculate the rank of employees based on salaries within each department
ranked_df = joined_df.withColumn("salary_rank", rank().over(window_spec))
# Show the DataFrame with salary ranks
ranked_df.show()
Answer: The given code will calculate the rank of employees based on their salaries within each department and display the DataFrame with the salary ranks.
Exercise 15: Data Filtering with SQL
Question: Filter the DataFrame to include only employees whose salary is above 50000 using SQL syntax.
# Register the DataFrame as a temporary SQL table
joined_df.createOrReplaceTempView("employee_data")
# Perform SQL filtering
filtered_df = spark.sql("SELECT * FROM employee_data WHERE salary > 50000")
# Show the filtered DataFrame
filtered_df.show()
Answer: The provided code will register the DataFrame as a temporary SQL table and then use SQL syntax to filter and select only employees whose salary is above 50000, displaying the filtered DataFrame.
Exercise 16: Data Pivot
Question: Pivot the DataFrame to transform rows into columns based on the "gender" column, and calculate the average salary for each gender.
# Pivot the DataFrame
pivot_df = joined_df.groupBy("department_name").pivot("gender").avg("salary")
# Show the pivoted DataFrame
pivot_df.show()
Answer: The given code will pivot the DataFrame to transform rows into columns based on the "gender" column and calculate the average salary for each gender, displaying the pivoted DataFrame.
Exercise 17: Data Joins - Left Outer Join
Question: Perform a left outer join between the "employees" DataFrame and the "departments" DataFrame on the "department_id" column, and display the combined DataFrame.
# Perform the left outer join
left_outer_join_df = employees_df.join(departments_df, on="department_id", how="left")
# Show the combined DataFrame
left_outer_join_df.show()
Answer: The provided code will perform a left outer join between the "employees" DataFrame and the "departments" DataFrame on the "department_id" column, resulting in a combined DataFrame, and display the result.
Exercise 18: Data Coalesce
Question: Create a new DataFrame with the "first_name" and "last_name" columns coalesced into a single "full_name" column.
# Create the new DataFrame with coalesced column
coalesced_df = joined_df.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
# Show the DataFrame with the new "full_name" column
coalesced_df.show()
Answer: The given code will create a new DataFrame with the "first_name" and "last_name" columns coalesced into a single "full_name" column, displaying the DataFrame with the new column.
Exercise 19: Data Aggregation - Count and GroupBy
Question: Count the number of employees in each department and display the result.
# Count the number of employees in each department
employee_count_df = joined_df.groupBy("department_name").count()
# Show the DataFrame with employee counts
employee_count_df.show()
Answer: The provided code will count the number of employees in each department and display the result in a DataFrame.
Exercise 20: Data Sampling
Question: Randomly sample 10% of the data from the DataFrame.
# Randomly sample 10% of the data
sampled_df = joined_df.sample(0.1)
# Show the sampled DataFrame
sampled_df.show()
Answer: The given code will randomly sample 10% of the data from the DataFrame and display the sampled DataFrame.
Exercise 21: Data Window Functions - Lag
Question: Create a new column "lag_salary" that contains the previous salary of each employee based on the "salary" column, within each department.
from pyspark.sql import Window
from pyspark.sql.functions import lag
# Define the window specification
window_spec = Window.partitionBy("department_name").orderBy("employee_id")
# Create the new column with lagged salary
lagged_df = joined_df.withColumn("lag_salary", lag("salary").over(window_spec))
# Show the DataFrame with the new column
lagged_df.show()
Answer: The provided code will create a new column "lag_salary" that contains the previous salary of each employee based on the "salary" column, within each department, and display the DataFrame with the new column.
Exercise 22: Data UDF - User-Defined Function
Question: Define a user-defined function (UDF) to categorize employees based on their salary. The function should return "High" for salaries greater than 75000, "Medium" for salaries between 50000 and 75000, and "Low" for salaries below 50000.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define the UDF function
def categorize_salary(salary):
if salary > 75000:
return "High"
elif 50000 <= salary <= 75000:
return "Medium"
else:
return "Low"
# Register the UDF
categorize_salary_udf = udf(categorize_salary, StringType())
# Apply the UDF to create a new column
categorized_df = joined_df.withColumn("salary_category", categorize_salary_udf("salary"))
# Show the DataFrame with the new column
categorized_df.show()
Answer: The given code will define a user-defined function (UDF) to categorize employees based on their salary and create a new column "salary_category" in the DataFrame with the salary categories, displaying the result.
Exercise 23: Data Filtering - Multiple Conditions
Question: Filter the DataFrame to include only male employees who have a salary above 60000.
# Filter the DataFrame with multiple conditions
filtered_df = joined_df.filter((joined_df["gender"] == "Male") & (joined_df["salary"] > 60000))
# Show the filtered DataFrame
filtered_df.show()
Answer: The provided code will filter the DataFrame to include only male employees with a salary above 60000, displaying the filtered DataFrame.
Exercise 24: Data Aggregation - Maximum Salary
Question: Calculate the maximum salary from the DataFrame.
# Calculate the maximum salary
max_salary = joined_df.agg({"salary": "max"}).collect()[0][0]
# Display the maximum salary
print("Maximum Salary:", max_salary)
Answer: The given code will calculate the maximum salary from the DataFrame and display the result.
Exercise 25: Data Sampling - Stratified Sampling
Question: Perform stratified sampling on the DataFrame to get a sample of 20% for each department.
# Perform stratified sampling on the DataFrame
stratified_sample_df = joined_df.sampleBy("department_name", fractions={"HR": 0.2, "Engineering": 0.2, "Marketing": 0.2})
# Show the stratified sampled DataFrame
stratified_sample_df.show()
Answer: The provided code will perform stratified sampling on the DataFrame to get a sample of 20% for each department and display the stratified sampled DataFrame.
Exercise 26: Blob Storage Connection
Question: Connect to Azure Blob Storage and read a CSV file named "data.csv" from the container "mycontainer".
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("BlobStorageExercise").getOrCreate()
# Connect to Azure Blob Storage
storage_account_name = "your_storage_account_name"
storage_account_key = "your_storage_account_key"
container_name = "mycontainer"
spark.conf.set("fs.azure.account.key." + storage_account_name + ".blob.core.windows.net", storage_account_key)
# Read the CSV file from the container
csv_file_path = "wasbs://" + container_name + "@{}.blob.core.windows.net/data.csv".format(storage_account_name)
data_df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
# Show the DataFrame with the data
data_df.show()
Answer: The provided code will connect to Azure Blob Storage using the provided storage account name and key. It will then read the CSV file "data.csv" from the container "mycontainer" and display the DataFrame with the data.
Exercise 27: Blob Storage Write
Question: Write the DataFrame "data_df" to a new CSV file named "output.csv" in the container "mycontainer".
# Write the DataFrame to CSV file
output_file_path = "wasbs://" + container_name + "@{}.blob.core.windows.net/output.csv".format(storage_account_name)
data_df.write.csv(output_file_path, header=True, mode="overwrite")
# Print success message
print("DataFrame successfully written to Blob Storage!")
Answer: The given code will write the DataFrame "data_df" to a new CSV file named "output.csv" in the container "mycontainer". It will overwrite the file if it already exists and print a success message upon completion.
Exercise 28: Blob Storage List Containers
Question: List all the containers in the Azure Blob Storage account.
from azure.storage.blob import BlobServiceClient
# Create a BlobServiceClient
connection_string = "your_blob_storage_connection_string"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# List all containers
containers = blob_service_client.list_containers()
# Print the container names
for container in containers:
print(container.name)
Answer: The provided code will create a BlobServiceClient using the given connection string and list all the containers in the Azure Blob Storage account, displaying their names.
Exercise 29: Key Vault Integration
Question: Integrate Azure Key Vault with PySpark to securely access the secret "database_password" stored in Key Vault.
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("KeyVaultIntegrationExercise").getOrCreate()
# Initialize DefaultAzureCredential
credential = DefaultAzureCredential()
# Create a SecretClient to access Key Vault
vault_name = "your_key_vault_name"
vault_url = "https://" + vault_name + ".vault.azure.net"
secret_client = SecretClient(vault_url=vault_url, credential=credential)
# Access the secret from Key Vault
secret_name = "database_password"
database_password = secret_client.get_secret(secret_name).value
# Use the secret in your PySpark application
# For example, set it as a configuration property
spark.conf.set("database.password", database_password)
Answer: The provided code will integrate Azure Key Vault with PySpark using the DefaultAzureCredential to securely access the secret "database_password" stored in Key Vault. The retrieved secret can be used in the PySpark application, for example, by setting it as a configuration property.
Exercise 30: Data Lake Storage Connection
Question: Connect to Azure Data Lake Storage Gen2 and read a Parquet file named "data.parquet" from the file system.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("DataLakeStorageExercise").getOrCreate()
# Connect to Azure Data Lake Storage Gen2
storage_account_name = "your_storage_account_name"
file_system_name = "your_file_system_name"
storage_account_key = "your_storage_account_key"
spark.conf.set("fs.azure.account.auth.type." + storage_account_name + ".dfs.core.windows.net", "SharedKey")
spark.conf.set("fs.azure.account.key." + storage_account_name + ".dfs.core.windows.net", storage_account_key)
spark.conf.set("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem")
# Read the Parquet file from the file system
parquet_file_path = "adl://" + storage_account_name + ".azuredatalakestore.net/" + file_system_name + "/data.parquet"
data_df = spark.read.parquet(parquet_file_path)
# Show the DataFrame with the data
data_df.show()
Answer: The provided code will connect to Azure Data Lake Storage Gen2 using the provided storage account name, file system name, and storage account key. It will then read the Parquet file "data.parquet" from the file system and display the DataFrame with the data.
Exercise 31: Data Lake Storage Write
Question: Write the DataFrame "data_df" to a new Parquet file named "output.parquet" in the file system.
# Write the DataFrame to Parquet file
output_file_path = "adl://" + storage_account_name + ".azuredatalakestore.net/" + file_system_name + "/output.parquet"
data_df.write.parquet(output_file_path, mode="overwrite")
# Print success message
print("DataFrame successfully written to Data Lake Storage Gen2!")
Answer: The given code will write the DataFrame "data_df" to a new Parquet file named "output.parquet" in the file system and print a success message upon completion.
Exercise 32: Data Lake Storage List Files
Question: List all the files in the Azure Data Lake Storage Gen2 file system.
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeServiceClient
# Initialize DefaultAzureCredential
credential = DefaultAzureCredential()
# Create a DataLakeServiceClient to access Data Lake Storage
storage_account_name = "your_storage_account_name"
file_system_name = "your_file_system_name"
storage_account_url = "https://" + storage_account_name + ".dfs.core.windows.net"
data_lake_client = DataLakeServiceClient(account_url=storage_account_url, credential=credential)
# Get the file system client
file_system_client = data_lake_client.get_file_system_client(file_system=file_system_name)
# List all the files in the file system
files = file_system_client.get_paths()
# Print the file names
for file in files:
print(file.name)
Answer: The provided code will create a DataLakeServiceClient using the DefaultAzureCredential to access Azure Data Lake Storage Gen2. It will then list all the files in the specified file system and display their names.
Exercise 33: Data Lake Storage Rename File
Question: Rename the file "data.parquet" to "renamed_data.parquet" in the Azure Data Lake Storage Gen2 file system.
# Get the file client for the source file
source_file_path = "data.parquet"
source_file_client = file_system_client.get_file_client(file_path=source_file_path)
# Get the file client for the destination file
destination_file_path = "renamed_data.parquet"
destination_file_client = file_system_client.get_file_client(file_path=destination_file_path)
# Rename the file
destination_file_client.create_file()
destination_file_client.start_copy_from_url(source_file_client.url)
source_file_client.delete()
Answer: The given code will rename the file "data.parquet" to "renamed_data.parquet" in the Azure Data Lake Storage Gen2 file system by copying the file to the new name and then deleting the original file.
Exercise 34: Data Lake Storage Delete File
Question: Delete the file "unwanted_data.parquet" from the Azure Data Lake Storage Gen2 file system.
# Get the file client for the file to be deleted
file_to_delete_path = "unwanted_data.parquet"
file_to_delete_client = file_system_client.get_file_client(file_path=file_to_delete_path)
# Delete the file
file_to_delete_client.delete()
Answer: The provided code will delete the file "unwanted_data.parquet" from the Azure Data Lake Storage Gen2 file system.
Exercise 35: Data Lake Storage Access Control
Question: Set the access control for the file "sensitive_data.csv" in the Azure Data Lake Storage Gen2 file system to restrict access to specific users and groups.
from azure.storage.filedatalake import ContentSettings, AccessControl
# Get the file client for the file to set access control
file_to_set_access_path = "sensitive_data.csv"
file_to_set_access_client = file_system_client.get_file_client(file_path=file_to_set_access_path)
# Define the access control list
access_control_list = AccessControl(
permissions="rw-------",
owner="user1",
group="group1"
)
# Set the access control
file_to_set_access_client.set_access_control(access_control_list=access_control_list)
Answer: The given code will set the access control for the file "sensitive_data.csv" in the Azure Data Lake Storage Gen2 file system to restrict access to specific users and groups. The file will have read and write permissions for "user1" and "group1," while others will have no access.
Exercise 36: Data Lake Storage Append to File
Question: Append new data to the file "data_log.txt" in the Azure Data Lake Storage Gen2 file system.
# Get the file client for the file to append data
file_to_append_path = "data_log.txt"
file_to_append_client = file_system_client.get_file_client(file_path=file_to_append_path)
# Define the new data to append
new_data = b"New log entry: Some data\n"
# Append the new data to the file
file_to_append_client.append_data(data=new_data, offset=0, length=len(new_data))
file_to_append_client.flush_data(len(new_data))
Answer: The provided code will append the new data "New log entry: Some data" to the file "data_log.txt" in the Azure Data Lake Storage Gen2 file system.
Exercise 37: Data Lake Storage Read File Range
Question: Read a specific range of bytes from the file "large_data.txt" in the Azure Data Lake Storage Gen2 file system.
# Get the file client for the file to read data range
file_to_read_path = "large_data.txt"
file_to_read_client = file_system_client.get_file_client(file_path=file_to_read_path)
# Define the range of bytes to read
start_byte = 1000
end_byte = 1999
# Read the specified range of bytes from the file
file_range = file_to_read_client.read_file_range(offset=start_byte, length=end_byte - start_byte + 1)
print(file_range.readall())
Answer: The given code will read the specified range of bytes (from 1000 to 1999) from the file "large_data.txt" in the Azure Data Lake Storage Gen2 file system and display the content.
Exercise 38: Data Lake Storage Move File
Question: Move the file "data_old.csv" to a new location "archive/data_old.csv" in the Azure Data Lake Storage Gen2 file system.
# Get the file client for the source file
source_file_path = "data_old.csv"
source_file_client = file_system_client.get_file_client(file_path=source_file_path)
# Get the file client for the destination file
destination_file_path = "archive/data_old.csv"
destination_file_client = file_system_client.get_file_client(file_path=destination_file_path)
# Move the file to the new location
destination_file_client.create_file()
destination_file_client.start_copy_from_url(source_file_client.url)
source_file_client.delete()
Answer: The provided code will move the file "data_old.csv" to the new location "archive/data_old.csv" in the Azure Data Lake Storage Gen2 file system by copying the file to the new location and then deleting the original file.
Exercise 39: Data Lake Storage Directory Operations
Question: Create a new directory named "logs" in the Azure Data Lake Storage Gen2 file system and list all directories in the file system.
# Create a new directory named "logs"
new_directory_path = "logs"
file_system_client.create_directory(new_directory_path)
# List all directories in the file system
directories = file_system_client.get_paths(directory=True)
# Print the directory names
for directory in directories:
print(directory.name)
Answer: The given code will create a new directory named "logs" in the Azure Data Lake Storage Gen2 file system and then list all directories in the file system, displaying their names.
Exercise 40: Data Lake Storage Access Control - ACL
Question: Set Access Control Lists (ACLs) for the directory "logs" in the Azure Data Lake Storage Gen2 file system to grant read access to "user1" and read/write access to "group1".
from azure.storage.filedatalake import AccessControl
# Get the directory client for which to set ACL
directory_path = "logs"
directory_client = file_system_client.get_directory_client(directory_path=directory_path)
# Define the ACLs
acl = AccessControl()
acl.grant_permission("user1", "r")
acl.grant_permission("group1", "rw")
# Set the ACLs for the directory
directory_client.set_access_control(acl=acl)
Answer: The provided code will set Access Control Lists (ACLs) for the directory "logs" in the Azure Data Lake Storage Gen2 file system to grant read access to "user1" and read/write access to "group1".
Exercise 41: Data Lake Storage File Metadata
Question: Set custom metadata for the file "document.docx" in the Azure Data Lake Storage Gen2 file system.
# Get the file client for which to set metadata
file_path = "document.docx"
file_client = file_system_client.get_file_client(file_path=file_path)
# Define the custom metadata
metadata = {"author": "John Doe", "department": "Marketing"}
# Set the custom metadata for the file
file_client.set_metadata(metadata=metadata)
Answer: The given code will set custom metadata (author and department) for the file "document.docx" in the Azure Data Lake Storage Gen2 file system.
Exercise 42: Data Lake Storage Read Metadata
Question: Read and display the metadata of the file "report.pdf" in the Azure Data Lake Storage Gen2 file system.
# Get the file client to read metadata
file_path = "report.pdf"
file_client = file_system_client.get_file_client(file_path=file_path)
# Read the metadata of the file
metadata = file_client.get_metadata()
# Display the metadata
print("Metadata of the file:", metadata)
Answer: The provided code will read and display the metadata of the file "report.pdf" in the Azure Data Lake Storage Gen2 file system.
Exercise 43: Data Lake Storage Directory Properties
Question: Get and display the properties of the directory "data" in the Azure Data Lake Storage Gen2 file system, such as creation time and last modified time.
# Get the directory client to read properties
directory_path = "data"
directory_client = file_system_client.get_directory_client(directory_path=directory_path)
# Get the properties of the directory
directory_properties = directory_client.get_properties()
# Display the directory properties
print("Directory Properties:", directory_properties)
Answer: The given code will get and display the properties of the directory "data" in the Azure Data Lake Storage Gen2 file system, including creation time and last modified time.
Exercise 44: Data Lake Storage Upload File
Question: Upload a local file "data.csv" to the Azure Data Lake Storage Gen2 file system.
# Upload a local file to Data Lake Storage
local_file_path = "/path/to/local/data.csv"
file_to_upload_path = "data.csv"
file_client = file_system_client.get_file_client(file_path=file_to_upload_path)
# Upload the file
with open(local_file_path, "rb") as local_file:
file_client.upload_data(local_file.read(), overwrite=True)
Answer: The provided code will upload the local file "data.csv" to the Azure Data Lake Storage Gen2 file system, overwriting the file if it already exists.
Exercise 45: Data Lake Storage Download File
Question: Download the file "report.pdf" from the Azure Data Lake Storage Gen2 file system to a local directory.
# Download a file from Data Lake Storage to a local directory
file_path = "report.pdf"
file_client = file_system_client.get_file_client(file_path=file_path)
local_download_path = "/path/to/local/download/report.pdf"
with open(local_download_path, "wb") as local_file:
file_data = file_client.download_file()
local_file.write(file_data.readall())
Answer: The given code will download the file "report.pdf" from the Azure Data Lake Storage Gen2 file system to a local directory.
Exercise 46: Data Lake Storage Delete Directory
Question: Delete the directory "old_data" from the Azure Data Lake Storage Gen2 file system.
# Get the directory client for the directory to be deleted
directory_to_delete_path = "old_data"
directory_to_delete_client = file_system_client.get_directory_client(directory_path=directory_to_delete_path)
# Delete the directory
directory_to_delete_client.delete()
Answer: The provided code will delete the directory "old_data" from the Azure Data Lake Storage Gen2 file system.
Exercise 47: Data Lake Storage Access Tier
Question: Set the access tier for the file "large_data.parquet" to "Hot" in the Azure Data Lake Storage Gen2 file system.
# Get the file client for which to set the access tier
file_path = "large_data.parquet"
file_client = file_system_client.get_file_client(file_path=file_path)
# Set the access tier to "Hot"
file_client.set_access_tier("Hot")
Answer: The given code will set the access tier for the file "large_data.parquet" to "Hot" in the Azure Data Lake Storage Gen2 file system.
Exercise 48: Data Lake Storage List Paths
Question: List all files and directories present in the Azure Data Lake Storage Gen2 file system.
# List all files and directories in the file system
paths = file_system_client.get_paths()
# Print the file and directory names
for path in paths:
print(path.name)
Answer: The provided code will list all the files and directories present in the Azure Data Lake Storage Gen2 file system and display their names.
Exercise 49: Data Lake Storage Move Directory
Question: Move the directory "data_old" to a new location "archive/data_old" in the Azure Data Lake Storage Gen2 file system.
# Get the source and destination directory clients
source_directory_path = "data_old"
destination_directory_path = "archive/data_old"
source_directory_client = file_system_client.get_directory_client(directory_path=source_directory_path)
destination_directory_client = file_system_client.get_directory_client(directory_path=destination_directory_path)
# Move the directory to the new location
destination_directory_client.create_directory()
destination_directory_client.start_copy_from_url(source_directory_client.url)
source_directory_client.delete()
Answer: The given code will move the directory "data_old" to the new location "archive/data_old" in the Azure Data Lake Storage Gen2 file system by copying the directory to the new location and then deleting the original directory.
Exercise 50: Data Lake Storage Read File Properties
Question: Read and display the properties of the file "document.docx" in the Azure Data Lake Storage Gen2 file system.
# Get the file client to read properties
file_path = "document.docx"
file_client = file_system_client.get_file_client(file_path=file_path)
# Get the properties of the file
file_properties = file_client.get_properties()
# Display the file properties
print("File Properties:", file_properties)
Answer: The provided code will read and display the properties of the file "document.docx" in the Azure Data Lake Storage Gen2 file system, including creation time, last modified time, size, etc.
Exercise 51: Data Lake Storage Set Expiry Time
Question: Set an expiry time for the file "data.csv" in the Azure Data Lake Storage Gen2 file system so that it is automatically deleted after a specific period.
from datetime import datetime, timedelta
from azure.storage.filedatalake import ContentSettings
# Get the file client for which to set the expiry time
file_path = "data.csv"
file_client = file_system_client.get_file_client(file_path=file_path)
# Calculate the expiry time (e.g., 7 days from now)
expiry_time = datetime.utcnow() + timedelta(days=7)
# Set the expiry time as the content settings for the file
content_settings = ContentSettings()
content_settings.expires_on = expiry_time
file_client.set_content_settings(content_settings)
Answer: The given code will set an expiry time for the file "data.csv" in the Azure Data Lake Storage Gen2 file system. The file will be automatically deleted after the specified period (e.g., 7 days from the current date).
Exercise 52: Data Lake Storage Access Control - POSIX Permissions
Question: Set POSIX-style permissions for the file "data.log" in the Azure Data Lake Storage Gen2 file system to grant read and write access to the owner, read-only access to the group, and no access to others.
# Get the file client for which to set POSIX permissions
file_path = "data.log"
file_client = file_system_client.get_file_client(file_path=file_path)
# Define the POSIX-style permissions
permissions = "rw-r-----" # Owner has read and write access, group has read-only access, others have no access
# Set the POSIX permissions for the file
file_client.set_permissions(permissions=permissions)
Answer: The provided code will set POSIX-style permissions for the file "data.log" in the Azure Data Lake Storage Gen2 file system to grant read and write access to the owner, read-only access to the group, and no access to others.
Exercise 53: Data Lake Storage Lease File
Question: Acquire a lease on the file "video.mp4" in the Azure Data Lake Storage Gen2 file system to prevent other processes from modifying the file.
# Get the file client to acquire a lease
file_path = "video.mp4"
file_client = file_system_client.get_file_client(file_path=file_path)
# Acquire a lease on the file
lease_id = file_client.acquire_lease()
Answer: The given code will acquire a lease on the file "video.mp4" in the Azure Data Lake Storage Gen2 file system, preventing other processes from modifying the file until the lease is released.
Exercise 54: Data Lake Storage Copy File
Question: Copy the file "data.csv" from one location to another location "backup/data.csv" in the Azure Data Lake Storage Gen2 file system.
# Get the source and destination file clients
source_file_path = "data.csv"
destination_file_path = "backup/data.csv"
source_file_client = file_system_client.get_file_client(file_path=source_file_path)
destination_file_client = file_system_client.get_file_client(file_path=destination_file_path)
# Copy the file to the new location
destination_file_client.start_copy_from_url(source_file_client.url)
Answer: The given code will copy the file "data.csv" from one location to another location "backup/data.csv" in the Azure Data Lake Storage Gen2 file system.
Exercise 55: Data Lake Storage Create Empty File
Question: Create an empty file named "new_file.txt" in the Azure Data Lake Storage Gen2 file system.
# Get the file client to create an empty file
file_path = "new_file.txt"
file_client = file_system_client.get_file_client(file_path=file_path)
# Create an empty file
file_client.create_file()
Answer: The provided code will create an empty file named "new_file.txt" in the Azure Data Lake Storage Gen2 file system.
Exercise 56: Data Lake Storage Set Access Control Recursive
Question: Set access control recursively for all files and directories within the "data" directory in the Azure Data Lake Storage Gen2 file system to grant read access to "user1" and read/write access to "group1".
from azure.storage.filedatalake import AccessControl
# Get the directory client for which to set access control recursively
directory_path = "data"
directory_client = file_system_client.get_directory_client(directory_path=directory_path)
# Define the ACLs
acl = AccessControl()
acl.grant_permission("user1", "r")
acl.grant_permission("group1", "rw")
# Set the access control recursively for all files and directories within the directory
directory_client.set_access_control(acl=acl, recursive=True)
Answer: The provided code will set access control recursively for all files and directories within the "data" directory in the Azure Data Lake Storage Gen2 file system. It will grant read access to "user1" and read/write access to "group1".
Exercise 57: Data Lake Storage Rename Directory
Question: Rename the directory "logs" to "old_logs" in the Azure Data Lake Storage Gen2 file system.
# Get the directory client for the source directory
source_directory_path = "logs"
source_directory_client = file_system_client.get_directory_client(directory_path=source_directory_path)
# Get the directory client for the destination directory
destination_directory_path = "old_logs"
destination_directory_client = file_system_client.get_directory_client(directory_path=destination_directory_path)
# Rename the directory
destination_directory_client.create_directory()
destination_directory_client.start_copy_from_url(source_directory_client.url)
source_directory_client.delete()
Answer: The given code will rename the directory "logs" to "old_logs" in the Azure Data Lake Storage Gen2 file system by copying the directory to the new name and then deleting the original directory.
Exercise 58: Data Lake Storage Copy Directory
Question: Copy the entire directory "source_data" to a new location "backup/source_data" in the Azure Data Lake Storage Gen2 file system.
# Get the source and destination directory clients
source_directory_path = "source_data"
destination_directory_path = "backup/source_data"
source_directory_client = file_system_client.get_directory_client(directory_path=source_directory_path)
destination_directory_client = file_system_client.get_directory_client(directory_path=destination_directory_path)
# Copy the directory to the new location
destination_directory_client.create_directory()
destination_directory_client.start_copy_from_url(source_directory_client.url)
Answer: The given code will copy the entire directory "source_data" to a new location "backup/source_data" in the Azure Data Lake Storage Gen2 file system.
Exercise 59: Data Lake Storage File Expiration
Question: Set an expiration time for the file "document.pdf" in the Azure Data Lake Storage Gen2 file system so that it is automatically deleted after a specific date and time.
from datetime import datetime
from azure.storage.filedatalake import ContentSettings
# Get the file client for which to set the expiration time
file_path = "document.pdf"
file_client = file_system_client.get_file_client(file_path=file_path)
# Define the expiration time (e.g., "2023-12-31 23:59:59")
expiration_time = datetime(2023, 12, 31, 23, 59, 59)
# Set the expiration time as the content settings for the file
content_settings = ContentSettings()
content_settings.expires_on = expiration_time
file_client.set_content_settings(content_settings)
Answer: The given code will set an expiration time for the file "document.pdf" in the Azure Data Lake Storage Gen2 file system. The file will be automatically deleted after the specified date and time (e.g., "2023-12-31 23:59:59").
Exercise 60: Data Lake Storage Get Directory Metadata
Question: Read and display the metadata of the directory "logs" in the Azure Data Lake Storage Gen2 file system.
# Get the directory client to read metadata
directory_path = "logs"
directory_client = file_system_client.get_directory_client(directory_path=directory_path)
# Read the metadata of the directory
directory_metadata = directory_client.get_metadata()
# Display the directory metadata
print("Metadata of the directory:", directory_metadata)
Answer: The provided code will read and display the metadata of the directory "logs" in the Azure Data Lake Storage Gen2 file system.
Exercise 61: Data Lake Storage Access Control - Default ACL
Question: Set a default ACL for the Azure Data Lake Storage Gen2 file system to grant read access to "user1" and read/write access to "group1".
from azure.storage.filedatalake import AccessControl
# Get the file system client for which to set the default ACL
file_system_client = DataLakeServiceClient(account_url="https://mydatalake.dfs.core.windows.net/",
credential="my_storage_account_key")
# Define the default ACLs
default_acl = AccessControl()
default_acl.grant_permission("user1", "r")
default_acl.grant_permission("group1", "rw")
# Set the default ACLs for the file system
file_system_client.set_file_system_access_control(default_acl=default_acl)
Answer: The given code will set a default ACL for the Azure Data Lake Storage Gen2 file system to grant read access to "user1" and read/write access to "group1". This means that any new file or directory created within the file system will inherit these permissions by default.
Exercise 62: Data Lake Storage File Append
Question: Append new data to the existing file "data.csv" in the Azure Data Lake Storage Gen2 file system.
# Get the file client for which to append data
file_path = "data.csv"
file_client = file_system_client.get_file_client(file_path=file_path)
# Data to be appended
data_to_append = "New data to be appended"
# Append data to the file
file_client.append_data(data_to_append, offset=file_client.get_file_properties().size)
Answer: The given code will append new data to the existing file "data.csv" in the Azure Data Lake Storage Gen2 file system.
Exercise 63: Data Lake Storage File Read and Write Data
Question: Read the contents of the file "data.txt" from the Azure Data Lake Storage Gen2 file system, perform some data manipulation, and then write the modified data back to the same file.
# Get the file client for which to read and write data
file_path = "data.txt"
file_client = file_system_client.get_file_client(file_path=file_path)
# Read the current data from the file
file_data = file_client.download_file()
current_data = file_data.readall().decode()
# Perform data manipulation (e.g., replace a specific string)
modified_data = current_data.replace("old_value", "new_value")
# Write the modified data back to the file
file_client.upload_data(modified_data, overwrite=True)
Answer: The provided code will read the contents of the file "data.txt" from the Azure Data Lake Storage Gen2 file system, perform data manipulation (e.g., replace a specific string), and then write the modified data back to the same file.
Exercise 64: Key Vault Secret Set
Question: Set a new secret named "database-password" in the Azure Key Vault with the value "mysecretpassword".
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
# Instantiate a SecretClient with the Key Vault URL and credential
key_vault_url = "https://mykeyvault.vault.azure.net/"
credential = DefaultAzureCredential()
secret_client = SecretClient(vault_url=key_vault_url, credential=credential)
# Set a new secret
secret_name = "database-password"
secret_value = "mysecretpassword"
secret_client.set_secret(secret_name, secret_value)
Answer: The given code will set a new secret named "database-password" in the Azure Key Vault with the value "mysecretpassword".
Exercise 65: Key Vault Secret Get
Question: Retrieve the value of the secret named "api-key" from the Azure Key Vault.
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
# Instantiate a SecretClient with the Key Vault URL and credential
key_vault_url = "https://mykeyvault.vault.azure.net/"
credential = DefaultAzureCredential()
secret_client = SecretClient(vault_url=key_vault_url, credential=credential)
# Retrieve the value of the secret
secret_name = "api-key"
retrieved_secret = secret_client.get_secret(secret_name)
# Print the secret value
print("Secret Value:", retrieved_secret.value)
Answer: The provided code will retrieve the value of the secret named "api-key" from the Azure Key Vault and print its value.
Exercise 66: Blob Storage Container Create
Question: Create a new container named "mycontainer" in the Azure Blob Storage account.
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
# Instantiate a BlobServiceClient with the Blob Storage account URL and credential
blob_storage_url = "https://mystorageaccount.blob.core.windows.net/"
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url=blob_storage_url, credential=credential)
# Create a new container
container_name = "mycontainer"
container_client = blob_service_client.create_container(container_name)
Answer: The given code will create a new container named "mycontainer" in the Azure Blob Storage account.
Exercise 67: Blob Storage Upload Blob
Question: Upload a local file "myfile.txt" to the "mycontainer" container in the Azure Blob Storage account.
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
# Instantiate a BlobServiceClient with the Blob Storage account URL and credential
blob_storage_url = "https://mystorageaccount.blob.core.windows.net/"
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url=blob_storage_url, credential=credential)
# Upload a local file to the container
container_name = "mycontainer"
container_client = blob_service_client.get_container_client(container_name)
local_file_path = "myfile.txt"
blob_name = "uploaded_file.txt"
with open(local_file_path, "rb") as local_file:
container_client.upload_blob(name=blob_name, data=local_file)
Answer: The provided code will upload the local file "myfile.txt" to the "mycontainer" container in the Azure Blob Storage account.
Exercise 68: Blob Storage Download Blob
Question: Download the blob named "data.csv" from the container "mycontainer" in the Azure Blob Storage account and save it as "downloaded_data.csv" in the local file system.
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
# Instantiate a BlobServiceClient with the Blob Storage account URL and credential
blob_storage_url = "https://mystorageaccount.blob.core.windows.net/"
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url=blob_storage_url, credential=credential)
# Download the blob
container_name = "mycontainer"
container_client = blob_service_client.get_container_client(container_name)
blob_name = "data.csv"
downloaded_blob = container_client.download_blob(blob_name)
# Save the blob data to a local file
local_file_path = "downloaded_data.csv"
with open(local_file_path, "wb") as local_file:
local_file.write(downloaded_blob.readall())
Answer: The provided code will download the blob named "data.csv" from the container "mycontainer" in the Azure Blob Storage account and save it as "downloaded_data.csv" in the local file system.
Exercise 69: Blob Storage List Blobs
Question: List all the blobs in the container "mycontainer" in the Azure Blob Storage account.
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
# Instantiate a BlobServiceClient with the Blob Storage account URL and credential
blob_storage_url = "https://mystorageaccount.blob.core.windows.net/"
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url=blob_storage_url, credential=credential)
# List all blobs in the container
container_name = "mycontainer"
container_client = blob_service_client.get_container_client(container_name)
blobs_list = container_client.list_blobs()
# Print the names of all blobs
for blob in blobs_list:
print(blob.name)
Answer: The given code will list all the blobs in the container "mycontainer" in the Azure Blob Storage account and print their names.
Exercise 70: Blob Storage Delete Blob
Question: Delete the blob named "old_data.csv" from the container "mycontainer" in the Azure Blob Storage account.
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
# Instantiate a BlobServiceClient with the Blob Storage account URL and credential
blob_storage_url = "https://mystorageaccount.blob.core.windows.net/"
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url=blob_storage_url, credential=credential)
# Delete the blob
container_name = "mycontainer"
container_client = blob_service_client.get_container_client(container_name)
blob_name = "old_data.csv"
container_client.delete_blob(blob_name)
Answer: The given code will delete the blob named "old_data.csv" from the container "mycontainer" in the Azure Blob Storage account.
Exercise 71: Working with DataFrames
Question: Create a DataFrame from a list of dictionaries and perform the following operations:
- Select columns "name" and "age".
- Filter rows where age is greater than 25.
- Group by name and calculate the average age for each name.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameOperations") \
.getOrCreate()
# Sample data as list of dictionaries
data = [{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
{"name": "Alice", "age": 28},
{"name": "Charlie", "age": 22}]
# Create a DataFrame from the list of dictionaries
df = spark.createDataFrame(data)
# Select columns "name" and "age"
df.select("name", "age").show()
# Filter rows where age is greater than 25
df.filter(df.age > 25).show()
# Group by name and calculate the average age for each name
df.groupBy("name").avg("age").show()
Answer: The provided code will create a DataFrame from the list of dictionaries, select columns "name" and "age", filter rows where age is greater than 25, and then group by name and calculate the average age for each name.
Exercise 72: Reading and Writing CSV Data
Question: Read data from a CSV file "data.csv" and write it to a Parquet file "data.parquet".
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("ReadAndWriteData") \
.getOrCreate()
# Read data from CSV file
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Write data to Parquet file
data_parquet = "data.parquet"
df.write.parquet(data_parquet)
Answer: The given code will read data from the CSV file "data.csv" and write it to a Parquet file "data.parquet".
Exercise 73: Working with SQL Queries
Question: Register the DataFrame from Exercise 72 as a temporary table "data_table" and perform SQL queries to select the names and ages of individuals with ages less than 30.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("SQLQueries") \
.getOrCreate()
# Register DataFrame as temporary table
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
df.createOrReplaceTempView("data_table")
# Perform SQL queries
result = spark.sql("SELECT name, age FROM data_table WHERE age < 30)
result.show()
Answer: The provided code will register the DataFrame from Exercise 72 as a temporary table "data_table" and perform SQL queries to select the names and ages of individuals with ages less than 30.
Exercise 74: SQL Aggregation
Question: Using SQL queries, find the maximum, minimum, and average age of individuals from the DataFrame created in Exercise 72.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("SQLAggregation") \
.getOrCreate()
# Read data from CSV file
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
df.createOrReplaceTempView("data_table")
# Perform SQL aggregation
result = spark.sql("SELECT MAX(age) AS max_age, MIN(age) AS min_age, AVG(age) AS avg_age FROM data_table")
result.show()
Answer: The given code will use SQL queries to find the maximum, minimum, and average age of individuals from the DataFrame created in Exercise 72.
Exercise 75: SQL Joins
Question: Create two DataFrames, "employees_df" and "departments_df", and then perform an inner join to get the names of employees along with their department names.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("SQLJoins") \
.getOrCreate()
# Sample data for employees and departments as lists of dictionaries
employees_data = [{"employee_id": 1, "name": "Alice", "department_id": 101},
{"employee_id": 2, "name": "Bob", "department_id": 102},
{"employee_id": 3, "name": "Charlie", "department_id": 101}]
departments_data = [{"department_id": 101, "department_name": "HR"},
{"department_id": 102, "department_name": "Finance"}]
# Create DataFrames from the lists of dictionaries
employees_df = spark.createDataFrame(employees_data)
departments_df = spark.createDataFrame(departments_data)
# Perform inner join on department_id
result = employees_df.join(departments_df, employees_df.department_id == departments_df.department_id, "inner")
# Select names of employees and their department names
result.select("name", "department_name").show()
Answer: The provided code will create two DataFrames, "employees_df" and "departments_df", and then perform an inner join on "department_id" to get the names of employees along with their department names.
Exercise 76: SQL Subqueries
Question: Using SQL subqueries, find the names of employees who are in the "Finance" department from the DataFrames created in Exercise 75.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("SQLSubqueries") \
.getOrCreate()
# Sample data for employees and departments as lists of dictionaries (Same as Exercise 75)
# Create DataFrames from the lists of dictionaries (Same as Exercise 75)
employees_df = spark.createDataFrame(employees_data)
departments_df = spark.createDataFrame(departments_data)
# Perform SQL subquery to find names of employees in the "Finance" department
result = employees_df.filter("department_id IN (SELECT department_id FROM departments_df WHERE department_name = 'Finance')")
# Select names of employees
result.select("name").show()
Answer: The given code will use SQL subqueries to find the names of employees who are in the "Finance" department from the DataFrames created in Exercise 75.
Exercise 77: Working with RDDs
Question: Create an RDD from a list of numbers and find the sum of all elements.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("RDDOperations") \
.getOrCreate()
# Create an RDD from a list of numbers
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)
# Calculate the sum of all elements
sum_of_elements = rdd.reduce(lambda x, y: x + y)
print("Sum of elements:", sum_of_elements)
Answer: The provided code will create an RDD from a list of numbers and calculate the sum of all elements.
Exercise 78: RDD Transformation - Map
Question: Create an RDD from a list of names and convert each name to uppercase using the "map" transformation.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("RDDTransformation") \
.getOrCreate()
# Create an RDD from a list of names
names = ["Alice", "Bob", "Charlie"]
rdd = spark.sparkContext.parallelize(names)
# Convert each name to uppercase using "map" transformation
uppercase_names = rdd.map(lambda name: name.upper())
uppercase_names.collect() # Collect the results to the driver program and print
Answer: The given code will create an RDD from a list of names and convert each name to uppercase using the "map" transformation.
Exercise 79: RDD Transformation - Filter
Question: Create an RDD from a list of numbers and filter out the even numbers using the "filter" transformation.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("RDDTransformation") \
.getOrCreate()
# Create an RDD from a list of numbers
numbers = [1, 2, 3, 4, 5, 6]
rdd = spark.sparkContext.parallelize(numbers)
# Filter out the even numbers using "filter" transformation
even_numbers = rdd.filter(lambda num: num % 2 == 0)
even_numbers.collect() # Collect the results to the driver program and print
Answer: The provided code will create an RDD from a list of numbers and filter out the even numbers using the "filter" transformation.
Exercise 80: Spark Streaming
Question: Create a Spark Streaming application that reads data from a TCP socket and counts the occurrences of each word in the stream.
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
# Create a SparkSession
spark = SparkSession.builder \
.appName("SparkStreamingWordCount") \
.getOrCreate()
# Create a StreamingContext with batch interval of 1 second
ssc = StreamingContext(spark.sparkContext, 1)
# Read data from TCP socket
lines = ssc.socketTextStream("localhost", 9999)
# Split lines into words
words = lines.flatMap(lambda line: line.split())
# Count the occurrences of each word
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
# Print the word counts
word_counts.pprint()
# Start the streaming context
ssc.start()
ssc.awaitTermination()
Answer: The given code will create a Spark Streaming application that reads data from a TCP socket, splits the lines into words, and counts the occurrences of each word in the stream. The word counts will be printed continuously as new data arrives.
Exercise 81: Machine Learning - Linear Regression
Question: Create a DataFrame from a CSV file "data.csv" with columns "feature" and "label" and perform linear regression to predict the label based on the feature.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# Create a SparkSession
spark = SparkSession.builder \
.appName("LinearRegressionExample") \
.getOrCreate()
# Read data from CSV file and create DataFrame
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Prepare feature vector using VectorAssembler
feature_cols = ["feature"]
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = vector_assembler.transform(df)
# Create and fit the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(df)
# Print the coefficients and intercept
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)
Answer: The provided code will create a DataFrame from the CSV file "data.csv" with columns "feature" and "label", prepare the feature vector, and perform linear regression to predict the label based on the feature.
Exercise 82: Machine Learning - Classification
Question: Create a DataFrame from a CSV file "data.csv" with columns "feature1", "feature2", and "label" and perform classification using the Decision Tree algorithm.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
# Create a SparkSession
spark = SparkSession.builder \
.appName("DecisionTreeClassificationExample") \
.getOrCreate()
# Read data from CSV file and create DataFrame
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Prepare feature vector using VectorAssembler
feature_cols = ["feature1", "feature2"]
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = vector_assembler.transform(df)
# Create and fit the Decision Tree classifier model
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dt_model = dt.fit(df)
# Print the feature importances
print("Feature Importances:", dt_model.featureImportances)
Answer: The given code will create a DataFrame from the CSV file "data.csv" with columns "feature1", "feature2", and "label", prepare the feature vector, and perform classification using the Decision Tree algorithm to predict the label based on the features. It will also print the feature importances.
Exercise 83: Machine Learning - Clustering
Question: Create a DataFrame from a CSV file "data.csv" with columns "feature1" and "feature2" and perform clustering using the K-means algorithm.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
# Create a SparkSession
spark = SparkSession.builder \
.appName("KMeansClusteringExample") \
.getOrCreate()
# Read data from CSV file and create DataFrame
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Prepare feature vector using VectorAssembler
feature_cols = ["feature1", "feature2"]
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = vector_assembler.transform(df)
# Create and fit the K-means clustering model
kmeans = KMeans(featuresCol="features", k=3)
kmeans_model = kmeans.fit(df)
# Print the cluster centers
print("Cluster Centers:")
kmeans_model.clusterCenters()
Answer: The provided code will create a DataFrame from the CSV file "data.csv" with columns "feature1" and "feature2", prepare the feature vector, and perform clustering using the K-means algorithm with k=3 to group data into three clusters. It will also print the cluster centers.
Exercise 84: DataFrame Caching
Question: Create a DataFrame from a CSV file "data.csv" and cache it to improve performance for repeated queries.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameCaching") \
.getOrCreate()
# Read data from CSV file and create DataFrame
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Cache the DataFrame
df.cache()
Answer: The provided code will create a DataFrame from the CSV file "data.csv" and cache it in memory, which can improve the performance of repeated queries on the DataFrame.
Exercise 85: DataFrame Repartitioning
Question: Repartition the DataFrame created in Exercise 84 into 5 partitions to distribute data evenly for parallel processing.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameRepartitioning") \
.getOrCreate()
# Read data from CSV file and create DataFrame
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Repartition the DataFrame into 5 partitions
df = df.repartition(5)
Answer: The given code will repartition the DataFrame created in Exercise 84 into 5 partitions to distribute data evenly for parallel processing.
Exercise 86: DataFrame UDF
Question: Create a DataFrame from a CSV file "data.csv" with columns "name" and "age", and then define a User-Defined Function (UDF) to add 5 years to the age of each person.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameUDF") \
.getOrCreate()
# Read data from CSV file and create DataFrame
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Define a UDF to add 5 years to the age
def add_five_years(age):
return age + 5
# Register the UDF with Spark
add_five_years_udf = udf(add_five_years, IntegerType())
# Apply the UDF to the "age" column
df = df.withColumn("age", add_five_years_udf(df["age"]))
Answer: The provided code will create a DataFrame from the CSV file "data.csv" with columns "name" and "age", and then define a User-Defined Function (UDF) to add 5 years to the age of each person using the "withColumn" function.
Exercise 87: DataFrame Window Functions
Question: Create a DataFrame from a CSV file "data.csv" with columns "name", "department", and "salary". Use window functions to calculate the average salary for each department.
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameWindowFunctions") \
.getOrCreate()
# Read data from CSV file and create DataFrame
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Define a Window specification partitioned by "department"
window_spec = Window.partitionBy("department")
# Calculate the average salary for each department using window functions
df = df.withColumn("avg_salary", F.avg("salary").over(window_spec))
Answer: The given code will create a DataFrame from the CSV file "data.csv" with columns "name", "department", and "salary", and then use window functions to calculate the average salary for each department.
Exercise 88: DataFrame Cross Join
Question: Create two DataFrames, "employees_df" and "departments_df", and perform a cross join to get all possible combinations of employees and departments.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameCrossJoin") \
.getOrCreate()
# Sample data for employees and departments as lists of dictionaries
employees_data = [{"employee_id": 1, "name": "Alice"},
{"employee_id": 2, "name": "Bob"}]
departments_data = [{"department_id": 101, "department_name": "HR"},
{"department_id": 102, "department_name": "Finance"}]
# Create DataFrames from the lists of dictionaries
employees_df = spark.createDataFrame(employees_data)
departments_df = spark.createDataFrame(departments_data)
# Perform cross join to get all combinations of employees and departments
result = employees_df.crossJoin(departments_df)
result.show()
Answer: The provided code will create two DataFrames, "employees_df" and "departments_df", and then perform a cross join to get all possible combinations of employees and departments.
Exercise 89: DataFrame Pivot
Question: Create a DataFrame from a CSV file "data.csv" with columns "name", "month", and "sales". Use pivot to transform the data, so that each row represents a unique name, and the columns represent the sales for each month.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFramePivot") \
.getOrCreate()
# Read data from CSV file and create DataFrame
data_csv = "data.csv"
df = spark.read.csv(data_csv, header=True, inferSchema=True)
# Pivot the DataFrame to transform data
df_pivot = df.groupBy("name").pivot("month").sum("sales")
Answer: The given code will create a DataFrame from the CSV file "data.csv" with columns "name", "month", and "sales", and then use the pivot function to transform the data so that each row represents a unique name, and the columns represent the sales for each month.
Exercise 90: Calling SQL Stored Procedure
Question: Assume you have a SQL Server database with a stored procedure named "GetEmployeeDetails" that takes an employee ID as input and returns the employee's name and salary. How can you call this stored procedure from PySpark and get the result?
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
# Create a SparkSession
spark = SparkSession.builder \
.appName("CallStoredProcedure") \
.getOrCreate()
# Configure JDBC connection parameters
url = "jdbc:sqlserver://:;databaseName="
properties = {"user": "", "password": "", "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
# Input employee ID
employee_id = 123
# Create a DataFrame with the employee ID
input_df = spark.createDataFrame([(employee_id,)], ["EmployeeID"])
# Register the DataFrame as a temporary table
input_df.createOrReplaceTempView("input_table")
# Call the stored procedure and get the result as a DataFrame
result_df = spark.read.jdbc(url=url, table="GetEmployeeDetails", properties=properties)
# Show the result
result_df.show()
Answer: The provided code will call the SQL Stored Procedure "GetEmployeeDetails" with the input employee ID, and it will get the result as a DataFrame in PySpark. The result will contain the employee's name and salary.
Exercise 91: Calling SQL Stored Procedure with Parameters
Question: Assume you have a SQL Server database with a stored procedure named "GetDepartmentEmployees" that takes a department ID as input and returns the list of employees in that department. How can you call this stored procedure from PySpark with a parameter and get the result?
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
# Create a SparkSession
spark = SparkSession.builder \
.appName("CallStoredProcedureWithParameters") \
.getOrCreate()
# Configure JDBC connection parameters (Same as Exercise 90)
# Input department ID
department_id = 101
# Create a DataFrame with the department ID
input_df = spark.createDataFrame([(department_id,)], ["DepartmentID"])
# Register the DataFrame as a temporary table
input_df.createOrReplaceTempView("input_table")
# Call the stored procedure with parameters and get the result as a DataFrame
result_df = spark.read.jdbc(url=url, table="GetDepartmentEmployees", properties=properties)
# Show the result
result_df.show()
Answer: The given code will call the SQL Stored Procedure "GetDepartmentEmployees" with the input department ID, and it will get the result as a DataFrame in PySpark. The result will contain the list of employees in the specified department.
Exercise 92: Working with Avro Data
Question: You have a set of Avro data files stored in a directory named "avro_data". How can you read these Avro files into a DataFrame in PySpark?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("ReadAvroData") \
.getOrCreate()
# Read Avro data files into a DataFrame
avro_data_dir = "avro_data"
df = spark.read.format("avro").load(avro_data_dir)
# Show the DataFrame
df.show()
Answer: The provided code will read the Avro data files stored in the directory "avro_data" into a DataFrame in PySpark. The resulting DataFrame will contain the Avro data.
Exercise 93: Working with Parquet Data
Question: You have a Parquet data file named "data.parquet". How can you read this Parquet file into a DataFrame in PySpark?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("ReadParquetData") \
.getOrCreate()
# Read Parquet data file into a DataFrame
parquet_file = "data.parquet"
df = spark.read.parquet(parquet_file)
# Show the DataFrame
df.show()
Answer: The given code will read the Parquet data file named "data.parquet" into a DataFrame in PySpark. The resulting DataFrame will contain the Parquet data.
Exercise 94: Writing Data to CSV
Question: You have a DataFrame named "df" that contains some data. How can you write this DataFrame to a CSV file named "output.csv"?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("WriteDataToCSV") \
.getOrCreate()
# Assuming "df" is a DataFrame containing data
# Write the DataFrame to a CSV file
output_csv = "output.csv"
df.write.csv(output_csv)
Answer: The provided code will write the DataFrame "df" containing data to a CSV file named "output.csv". The data will be saved in CSV format.
Exercise 95: Working with JSON Data
Question: You have a JSON data file named "data.json". How can you read this JSON file into a DataFrame in PySpark?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("ReadJSONData") \
.getOrCreate()
# Read JSON data file into a DataFrame
json_file = "data.json"
df = spark.read.json(json_file)
# Show the DataFrame
df.show()
Answer: The given code will read the JSON data file named "data.json" into a DataFrame in PySpark. The resulting DataFrame will contain the JSON data.
Exercise 96: Writing Data to JSON
Question: You have a DataFrame named "df" that contains some data. How can you write this DataFrame to a JSON file named "output.json"?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("WriteDataToJSON") \
.getOrCreate()
# Assuming "df" is a DataFrame containing data
# Write the DataFrame to a JSON file
output_json = "output.json"
df.write.json(output_json)
Answer: The provided code will write the DataFrame "df" containing data to a JSON file named "output.json". The data will be saved in JSON format.
Exercise 97: Working with Hive Tables
Question: Assume you have a Hive table named "employees" in the default database. How can you read this Hive table into a DataFrame in PySpark?
from pyspark.sql import SparkSession
# Create a SparkSession with Hive support
spark = SparkSession.builder \
.appName("ReadHiveTable") \
.enableHiveSupport() \
.getOrCreate()
# Read Hive table into a DataFrame
hive_table = "default.employees"
df = spark.table(hive_table)
# Show the DataFrame
df.show()
Answer: The given code will create a SparkSession with Hive support and then read the Hive table "employees" in the default database into a DataFrame in PySpark. The resulting DataFrame will contain the data from the Hive table.
Exercise 98: DataFrame Joins
Question: You have two DataFrames, "employees_df" and "departments_df", with columns "employee_id", "name" and "department_id", "department_name" respectively. How can you perform an inner join on these DataFrames based on the "department_id" column?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameJoins") \
.getOrCreate()
# Assuming "employees_df" and "departments_df" are two DataFrames
# Perform inner join on the DataFrames based on "department_id"
joined_df = employees_df.join(departments_df, on="department_id", how="inner")
# Show the joined DataFrame
joined_df.show()
Answer: The provided code will perform an inner join on the DataFrames "employees_df" and "departments_df" based on the "department_id" column. The resulting DataFrame "joined_df" will contain the matched rows from both DataFrames.
Exercise 99: DataFrame Union
Question: You have two DataFrames, "df1" and "df2", with the same schema. How can you combine these DataFrames using the union operation?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameUnion") \
.getOrCreate()
# Assuming "df1" and "df2" are two DataFrames with the same schema
# Combine the DataFrames using union
combined_df = df1.union(df2)
# Show the combined DataFrame
combined_df.show()
Answer: The given code will combine the DataFrames "df1" and "df2" using the union operation. The resulting DataFrame "combined_df" will contain all the rows from both DataFrames.
Exercise 100: DataFrame Aggregations
Question: You have a DataFrame named "sales_df" with columns "product", "category", and "sales". How can you calculate the total sales for each category?
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameAggregations") \
.getOrCreate()
# Assuming "sales_df" is a DataFrame with columns "product", "category", and "sales"
# Calculate total sales for each category
total_sales_df = sales_df.groupBy("category").agg(F.sum("sales").alias("total_sales"))
# Show the total sales DataFrame
total_sales_df.show()
Answer: The provided code will calculate the total sales for each category in the DataFrame "sales_df" using the "groupBy" and "agg" functions. The resulting DataFrame "total_sales_df" will contain the total sales for each category.
Exercise 101: DataFrame Filtering
Question: You have a DataFrame named "employees_df" with columns "name" and "age". How can you filter the DataFrame to only keep the employees who are above 30 years old?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameFiltering") \
.getOrCreate()
# Assuming "employees_df" is a DataFrame with columns "name" and "age"
# Filter the DataFrame to keep employees above 30 years old
filtered_df = employees_df.filter(employees_df.age > 30)
# Show the filtered DataFrame
filtered_df.show()
Answer: The provided code will filter the DataFrame "employees_df" to only keep the employees who are above 30 years old. The resulting DataFrame "filtered_df" will contain the filtered data.
Exercise 102: DataFrame Sorting
Question: You have a DataFrame named "sales_df" with columns "product", "date", and "sales". How can you sort the DataFrame in ascending order based on the "sales" column?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameSorting") \
.getOrCreate()
# Assuming "sales_df" is a DataFrame with columns "product", "date", and "sales"
# Sort the DataFrame in ascending order based on "sales" column
sorted_df = sales_df.sort("sales")
# Show the sorted DataFrame
sorted_df.show()
Answer: The given code will sort the DataFrame "sales_df" in ascending order based on the "sales" column. The resulting DataFrame "sorted_df" will contain the data sorted by sales in ascending order.
Exercise 103: DataFrame Drop Columns
Question: You have a DataFrame named "data_df" with columns "col1", "col2", and "col3". How can you drop the "col3" column from the DataFrame?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameDropColumns") \
.getOrCreate()
# Assuming "data_df" is a DataFrame with columns "col1", "col2", and "col3"
# Drop the "col3" column from the DataFrame
data_df = data_df.drop("col3")
# Show the DataFrame after dropping the column
data_df.show()
Answer: The provided code will drop the "col3" column from the DataFrame "data_df". The resulting DataFrame "data_df" will not contain the "col3" column.
Exercise 104: DataFrame GroupBy and Pivot
Question: You have a DataFrame named "sales_df" with columns "product", "category", and "sales". How can you calculate the total sales for each category and pivot the result to show the total sales for each category as separate columns?
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameGroupByAndPivot") \
.getOrCreate()
# Assuming "sales_df" is a DataFrame with columns "product", "category", and "sales"
# Calculate total sales for each category
total_sales_df = sales_df.groupBy("category").agg(F.sum("sales").alias("total_sales"))
# Pivot the result to show total sales for each category as separate columns
pivoted_df = total_sales_df.groupBy().pivot("category").sum("total_sales")
# Show the pivoted DataFrame
pivoted_df.show()
Answer: The given code will calculate the total sales for each category in the DataFrame "sales_df" using the "groupBy" and "agg" functions. It will then pivot the result to show the total sales for each category as separate columns in the DataFrame "pivoted_df".
Exercise 105: DataFrame Window Functions - Ranking
Question: You have a DataFrame named "employees_df" with columns "name" and "salary". How can you add a new column "rank" that shows the ranking of employees based on their salary in descending order?
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameWindowFunctionsRanking") \
.getOrCreate()
# Assuming "employees_df" is a DataFrame with columns "name" and "salary"
# Define a Window specification to order by salary in descending order
window_spec = Window.orderBy(F.desc("salary"))
# Add a new column "rank" to show the ranking of employees based on salary
employees_df = employees_df.withColumn("rank", F.rank().over(window_spec))
# Show the DataFrame with the new "rank" column
employees_df.show()
Answer: The provided code will add a new column "rank" to the DataFrame "employees_df", which shows the ranking of employees based on their salary in descending order.
Exercise 106: DataFrame Window Functions - Running Total
Question: You have a DataFrame named "sales_df" with columns "date" and "sales". How can you add a new column "running_total" that shows the running total of sales ordered by date?
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameWindowFunctionsRunningTotal") \
.getOrCreate()
# Assuming "sales_df" is a DataFrame with columns "date" and "sales"
# Define a Window specification to order by date
window_spec = Window.orderBy("date")
# Add a new column "running_total" to show the running total of sales
sales_df = sales_df.withColumn("running_total", F.sum("sales").over(window_spec))
# Show the DataFrame with the new "running_total" column
sales_df.show()
Answer: The given code will add a new column "running_total" to the DataFrame "sales_df", which shows the running total of sales ordered by date.
Exercise 107: Working with Broadcast Variables
Question: You have a large DataFrame named "data_df" and a small lookup DataFrame named "lookup_df" with columns "key" and "value". How can you efficiently join the two DataFrames using a broadcast variable?
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName("BroadcastVariables") \
.getOrCreate()
# Assuming "data_df" is a large DataFrame and "lookup_df" is a small DataFrame
# Broadcast the lookup DataFrame for efficient join
lookup_broadcast = F.broadcast(lookup_df)
# Join the two DataFrames using the broadcast variable
joined_df = data_df.join(lookup_broadcast, on="key", how="left")
# Show the joined DataFrame
joined_df.show()
Answer: The provided code will efficiently join the two DataFrames "data_df" and "lookup_df" using a broadcast variable to speed up the join operation. The resulting DataFrame "joined_df" will contain the combined data from both DataFrames.
Exercise 108: DataFrame Repartitioning
Question: You have a DataFrame named "data_df" with a large number of partitions. How can you repartition the DataFrame into a smaller number of partitions for better performance?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("RepartitionDataFrame") \
.getOrCreate()
# Assuming "data_df" is a DataFrame with a large number of partitions
# Repartition the DataFrame into a smaller number of partitions (e.g., 4)
repartitioned_df = data_df.repartition(4)
# Show the repartitioned DataFrame
repartitioned_df.show()
Answer: The given code will repartition the DataFrame "data_df" into a smaller number of partitions (e.g., 4 partitions) for better performance. The resulting DataFrame "repartitioned_df" will have fewer partitions than the original DataFrame.
Exercise 109: DataFrame Caching
Question: You have a DataFrame named "data_df" that is used multiple times in the data processing pipeline. How can you cache the DataFrame to improve performance?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("DataFrameCaching") \
.getOrCreate()
# Assuming "data_df" is a DataFrame that is used multiple times
# Cache the DataFrame for improved performance
data_df.cache()
# Perform various operations on the cached DataFrame
# (e.g., data_df.select(...), data_df.filter(...), etc.)
# Show the DataFrame
data_df.show()
Answer: The provided code will cache the DataFrame "data_df" to improve performance, especially when it is used multiple times in the data processing pipeline. Subsequent operations on the DataFrame will be faster as it is now cached in memory.
Exercise 110: Handling Missing Data
Question: You have a DataFrame named "data_df" with columns "name", "age", and "gender". The DataFrame contains some missing values in the "age" column. How can you handle the missing data in PySpark?
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName("HandlingMissingData") \
.getOrCreate()
# Assuming "data_df" is a DataFrame with columns "name", "age", and "gender"
# Option 1: Fill missing values with a default value (e.g., -1)
data_df_filled = data_df.fillna(-1, subset=["age"])
# Option 2: Drop rows with missing values
data_df_dropped = data_df.dropna(subset=["age"])
# Option 3: Impute missing values with the mean value of the "age" column
mean_age = data_df.select(F.mean("age")).collect()[0][0]
data_df_imputed = data_df.fillna(mean_age, subset=["age"])
# Show the DataFrames
data_df_filled.show()
data_df_dropped.show()
data_df_imputed.show()
Answer: The provided code shows three different ways to handle missing data in the DataFrame "data_df". Option 1 fills missing values in the "age" column with a default value (e.g., -1), Option 2 drops rows with missing values in the "age" column, and Option 3 imputes missing values in the "age" column with the mean value of the available "age" values.
Congratulations! You have completed all 110 PySpark practice exercises for beginners. These exercises cover various essential concepts and operations in PySpark and will help you gain confidence in using PySpark for data processing and analysis tasks. Remember to experiment with different scenarios and datasets to deepen your understanding of PySpark's capabilities. Keep practicing, and happy PySpark coding! 😊
Comments