Skip to content

Commit

Permalink
Simplify Spark deps packaging (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
guidok91 authored Jan 24, 2025
1 parent 15cc225 commit 4586dd6
Show file tree
Hide file tree
Showing 6 changed files with 520 additions and 445 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM python:3.13-slim
ENV PATH="/root/.local/bin:$PATH"

RUN apt-get update && \
apt-get install -y default-jre-headless make git curl && \
apt-get install -y default-jre-headless make git curl zip && \
rm -rf /var/lib/apt/lists/*

COPY . .
Expand Down
14 changes: 4 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ docker-run: # Spin up a local container in interactive mode.
docker run --platform=linux/amd64 --rm -it spark-movies-etl bash

.PHONY: package
package: # Package the app and its dependencies to be used in spark-submit.
package: # Package the app to be used in spark-submit.
rm -rf deps
mkdir deps
poetry export -f requirements.txt --output deps/requirements.txt
poetry run python -m venv deps/.venv
. deps/.venv/bin/activate && \
pip install --upgrade pip setuptools wheel && \
pip install -r deps/requirements.txt && \
venv-pack -o deps/venv.tar.gz
rm -r deps/.venv deps/requirements.txt
zip -r deps/deps.zip movies_etl

.PHONY: test
test: # Run unit and integration tests.
Expand All @@ -40,13 +34,13 @@ lint: # Run code linting tools.

.PHONY: run-app
run-app: # Run pipeline (example: EXECUTION_DATE=2021-01-01 ENV_FOR_DYNACONF=development SPARK_MASTER=local[*] DEPLOY_MODE=client make run-app).
PYSPARK_DRIVER_PYTHON=python PYSPARK_PYTHON=./environment/bin/python poetry run spark-submit \
poetry run spark-submit \
--master ${SPARK_MASTER} \
--deploy-mode ${DEPLOY_MODE} \
--packages io.delta:delta-spark_2.12:$(DELTA_VERSION) \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--archives deps/venv.tar.gz#environment \
--py-files deps/deps.zip \
movies_etl/main.py \
--execution-date ${EXECUTION_DATE} \
--config-file-path app_config.yaml
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Dependabot is configured to periodically upgrade repo dependencies. See [dependa

Since there are multiple ways of deploying and running Spark applications in production (Kubernetes, AWS EMR, Databricks, etc), this repo aims to be as agnostic and generic as possible. The application and its dependencies are built into a Docker image (see [Dockerfile](Dockerfile)).

In order to distribute code and dependencies across Spark executors [this method](https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#using-virtualenv) is used.
In order to distribute code and dependencies across Spark executors [this method](https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#using-pyspark-native-features) is used.

## CI/CD
Github Actions workflows for CI/CD are defined [here](.github/workflows) and can be seen [here](https://github.com/guidok91/spark-movies-etl/actions).
Expand Down
6 changes: 4 additions & 2 deletions movies_etl/tasks/curate_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import os
import pkgutil

import yaml
from pyspark.sql import DataFrame, SparkSession
from soda.scan import Scan

Expand Down Expand Up @@ -39,13 +41,13 @@ def _run_data_quality_checks(self) -> None:
self.logger.info(f"Running Data Quality checks for output ({self.path_output}).")
self.spark.read.format("delta").load(self.path_output).createOrReplaceTempView("movie_ratings_curated")

dq_checks_config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "curate_data_checks.yaml")
dq_checks_config = str(yaml.safe_load(pkgutil.get_data(__name__, "curate_data_checks.yaml"))) # type: ignore
scan = Scan()

scan.set_data_source_name("spark_df")
scan.add_spark_session(self.spark)
scan.add_variables({"run_date": self.execution_date.strftime("%Y-%m-%d")})
scan.add_sodacl_yaml_file(dq_checks_config_file)
scan.add_sodacl_yaml_str(dq_checks_config)

scan.execute()

Expand Down
Loading

0 comments on commit 4586dd6

Please sign in to comment.