LAB / PROJECT: Kubeflow Pipeline (From Scratch) with Custom Docker Images (Decision Tree, Logistic Regression, SVM, Naive Bayes, Xg Boost)
This lab/project shows:
- how to create Kubeflow Pipeline with Custom Docker Images
- all files:
- Download Data Component
- Decision Tree Component
- Logistic Regression Component
- SVM Component
- Naive Bayes Component
- Xg Boost Component
- Show Results Component
- Compiling, Uploading Pipeline into Kubeflow and Running
- References
- You should have Kubeflow Environment (Easiest Way: Using MiniKF)
- Create Python codes and Pipeline Components (Docker Images) for each steps:
- and Dowload Data Component (Yaml file and docker image with dockerfile that includes
- and Decision Tree Component (Yaml file and docker image with dockerfile that includes
- and Logistic Regression Component (Yaml file and docker image with dockerfile that includes
- and SVM Component (Yaml file and docker image with dockerfile that includes
- and Naive Bayes Component (Yaml file and docker image with dockerfile that includes
- and XG Boost Component (Yaml file and docker image with dockerfile that includes
- Show Results Component
import json
import argparse
from pathlib import Path
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
def _download_data(args):
# Gets data from sklearn library and split dataset
x, y = load_breast_cancer(return_X_y=True)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)
# Creates `data` structure to save
data = {'x_train' : x_train.tolist(),
'y_train' : y_train.tolist(),
'x_test' : x_test.tolist(),
'y_test' : y_test.tolist()}
# Creates a json object based on `data`
data_json = json.dumps(data)
# Saves the json object into a file
with open(, 'w') as output_file:
json.dump(data_json, output_file)
if __name__ == '__main__':
# This component does not receive any input, it only outputs one artifact which is `data`.
parser = argparse.ArgumentParser()
# Output argument: data
parser.add_argument('--data', type=str)
args = parser.parse_args()
# Creating the directory where the OUTPUT file will be created, (the directory may or may not exist).
# This will be used for other component's input (e.g. decision tree, logistic regression)
Path(, exist_ok=True)
- Create download data component (download_data.yaml)
name: Download Data Function
description: Download toy data from sklearn datasets
- {name: Data, type: LocalPath, description: 'Path where data will be stored.'}
image: omerbsezer/kubeflow_component:download_breast_cancer_data_v1
command: [
{outputPath: Data},
- Create requirements.txt:
- Dockerfile:
FROM python:3.8-slim
WORKDIR /pipeline
COPY requirements.txt /pipeline
RUN pip install -r requirements.txt
COPY /pipeline
- Go to the path where dockerfile is, then build Dockerfile and push the image to the Docker Registry:
docker image build -t omerbsezer/kubeflow_component:download_breast_cancer_data_v1 .
docker push omerbsezer/kubeflow_component:download_breast_cancer_data_v1
- Creating
import json
import argparse
from pathlib import Path
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier
def _decision_tree(args):
# Open and reads file "data"
with open( as data_file:
data = json.load(data_file)
# Data type is 'dict', however since the file was loaded as a json object, it is first loaded as a string
# thus we need to load again from such string in order to get the dict-type object.
data = json.loads(data)
x_train = data['x_train']
y_train = data['y_train']
x_test = data['x_test']
y_test = data['y_test']
# Initialize and train the model
model = DecisionTreeClassifier(max_depth=4), y_train)
# Get predictions
y_pred = model.predict(x_test)
# Get accuracy
accuracy = accuracy_score(y_test, y_pred)
# Save output into file
with open(args.accuracy, 'w') as accuracy_file:
if __name__ == '__main__':
# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='My program description')
# Input argument: data
parser.add_argument('--data', type=str)
# Output argument: accuracy
parser.add_argument('--accuracy', type=str)
args = parser.parse_args()
# Creating the directory where the OUTPUT file will be created (the directory may or may not exist).
Path(args.accuracy).parent.mkdir(parents=True, exist_ok=True)
- Create decision tree component (decision_tree.yaml)
name: Decision Tree classifier
description: Trains a decision tree classifier
- {name: Data, type: LocalPath, description: 'Path where data is stored.'}
- {name: Accuracy, type: Float, description: 'Accuracy metric'}
image: omerbsezer/kubeflow_component:decision_tree_v1
command: [
{inputPath: Data},
{outputPath: Accuracy},
- Create requirements.txt:
- Dockerfile:
FROM python:3.8-slim
WORKDIR /pipelines
COPY requirements.txt /pipelines
RUN pip install -r requirements.txt
COPY /pipelines
- Go to the path where dockerfile is, then build Dockerfile and push the image to the Docker Registry:
docker image build -t omerbsezer/kubeflow_component:decision_tree_v1 .
docker push omerbsezer/kubeflow_component:decision_tree_v1
- Creating
import json
import argparse
from pathlib import Path
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
def _logistic_regression(args):
# Open and reads file "data"
with open( as data_file:
data = json.load(data_file)
# Data type is 'dict', however since the file was loaded as a json object, it is first loaded as a string
# thus we need to load again from such string in order to get the dict-type object.
data = json.loads(data)
x_train = data['x_train']
y_train = data['y_train']
x_test = data['x_test']
y_test = data['y_test']
# Initialize and train the model
model = LogisticRegression(), y_train)
# Get predictions
y_pred = model.predict(x_test)
# Get accuracy
accuracy = accuracy_score(y_test, y_pred)
# Save output into file
with open(args.accuracy, 'w') as accuracy_file:
if __name__ == '__main__':
# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='My program description')
# Input argument: data
parser.add_argument('--data', type=str)
# Output argument: accuracy
parser.add_argument('--accuracy', type=str)
args = parser.parse_args()
# Creating the directory where the output file will be created (the directory may or may not exist).
Path(args.accuracy).parent.mkdir(parents=True, exist_ok=True)
- Create logistic regression component (logistic_regression.yaml)
name: Logistic Regression Classifier
description: Trains a Logistic Regression Classifier
- {name: Data, type: LocalPath, description: 'Path where data is stored.'}
- {name: Accuracy, type: Float, description: 'Accuracy metric'}
image: omerbsezer/kubeflow_component:logistic_regression_v1
command: [
{inputPath: Data},
{outputPath: Accuracy},
- Create requirements.txt:
- Dockerfile:
FROM python:3.8-slim
WORKDIR /pipelines
COPY requirements.txt /pipelines
RUN pip install -r requirements.txt
COPY /pipelines
- Go to the path where dockerfile is, then build Dockerfile and push the image to the Docker Registry:
docker image build -t omerbsezer/kubeflow_component:logistic_regression_v1 .
docker push omerbsezer/kubeflow_component:logistic_regression_v1
- Creating
import json
import argparse
from pathlib import Path
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
def _svm(args):
# Open and reads file "data"
with open( as data_file:
data = json.load(data_file)
# Data type is 'dict', however since the file was loaded as a json object, it is first loaded as a string
# thus we need to load again from such string in order to get the dict-type object.
data = json.loads(data)
x_train = data['x_train']
y_train = data['y_train']
x_test = data['x_test']
y_test = data['y_test']
# Initialize and train the model
model = SVC(kernel='linear'), y_train)
# Get predictions
y_pred = model.predict(x_test)
# Get accuracy
accuracy = accuracy_score(y_test, y_pred)
# Save output into file
with open(args.accuracy, 'w') as accuracy_file:
if __name__ == '__main__':
# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='My program description')
# Input argument: data
parser.add_argument('--data', type=str)
# Output argument: accuracy
parser.add_argument('--accuracy', type=str)
args = parser.parse_args()
# Creating the directory where the OUTPUT file will be created (the directory may or may not exist).
Path(args.accuracy).parent.mkdir(parents=True, exist_ok=True)
- Create SVM component (svm.yaml)
name: Support Vector (svm) classifier
description: Trains a svm classifier
- {name: Data, type: LocalPath, description: 'Path where data is stored.'}
- {name: Accuracy, type: Float, description: 'Accuracy metric'}
image: omerbsezer/kubeflow_component:svm_v1
command: [
{inputPath: Data},
{outputPath: Accuracy},
- Create requirements.txt:
- Dockerfile:
FROM python:3.8-slim
WORKDIR /pipelines
COPY requirements.txt /pipelines
RUN pip install -r requirements.txt
COPY /pipelines
- Go to the path where dockerfile is, then build Dockerfile and push the image to the Docker Registry:
docker image build -t omerbsezer/kubeflow_component:svm_v1 .
docker push omerbsezer/kubeflow_component:svm_v1
- Creating
import json
import argparse
from pathlib import Path
from sklearn.metrics import accuracy_score
from sklearn.naive_bayes import GaussianNB
def _naive_bayes(args):
# Open and reads file "data"
with open( as data_file:
data = json.load(data_file)
# Data type is 'dict', however since the file was loaded as a json object, it is first loaded as a string
# thus we need to load again from such string in order to get the dict-type object.
data = json.loads(data)
x_train = data['x_train']
y_train = data['y_train']
x_test = data['x_test']
y_test = data['y_test']
# Initialize and train the model
model = GaussianNB(), y_train)
# Get predictions
y_pred = model.predict(x_test)
# Get accuracy
accuracy = accuracy_score(y_test, y_pred)
# Save output into file
with open(args.accuracy, 'w') as accuracy_file:
if __name__ == '__main__':
# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='My program description')
# Input argument: data
parser.add_argument('--data', type=str)
# Output argument: accuracy
parser.add_argument('--accuracy', type=str)
args = parser.parse_args()
# Creating the directory where the OUTPUT file will be created (the directory may or may not exist).
Path(args.accuracy).parent.mkdir(parents=True, exist_ok=True)
- Create naive bayes component (naive_bayes.yaml)
name: Naive Bayes classifier
description: Trains a Naive Bayes classifier
- {name: Data, type: LocalPath, description: 'Path where data is stored.'}
- {name: Accuracy, type: Float, description: 'Accuracy metric'}
image: omerbsezer/kubeflow_component:naive_bayes_v1
command: [
{inputPath: Data},
{outputPath: Accuracy},
- Create requirements.txt:
- Dockerfile:
FROM python:3.8-slim
WORKDIR /pipelines
COPY requirements.txt /pipelines
RUN pip install -r requirements.txt
COPY /pipelines
- Go to the path where dockerfile is, then build Dockerfile and push the image to the Docker Registry:
docker image build -t omerbsezer/kubeflow_component:naive_bayes_v1 .
docker push omerbsezer/kubeflow_component:naive_bayes_v1
- Creating
import json
import argparse
from pathlib import Path
from sklearn.metrics import accuracy_score
from xgboost import XGBClassifier
def _xg_boost(args):
# Open and reads file "data"
with open( as data_file:
data = json.load(data_file)
# Data type is 'dict', however since the file was loaded as a json object, it is first loaded as a string
# thus we need to load again from such string in order to get the dict-type object.
data = json.loads(data)
x_train = data['x_train']
y_train = data['y_train']
x_test = data['x_test']
y_test = data['y_test']
# Initialize and train the model
model = XGBClassifier(), y_train)
# Get predictions
y_pred = model.predict(x_test)
# Get accuracy
accuracy = accuracy_score(y_test, y_pred)
# Save output into file
with open(args.accuracy, 'w') as accuracy_file:
if __name__ == '__main__':
# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='My program description')
# Input argument: data
parser.add_argument('--data', type=str)
# Output argument: accuracy
parser.add_argument('--accuracy', type=str)
args = parser.parse_args()
# Creating the directory where the OUTPUT file will be created (the directory may or may not exist).
Path(args.accuracy).parent.mkdir(parents=True, exist_ok=True)
- Create XgBoost component (xg_boost.yaml)
name: Xg Boost classifier
description: Trains an xg boost classifier
- {name: Data, type: LocalPath, description: 'Path where data is stored.'}
- {name: Accuracy, type: Float, description: 'Accuracy metric'}
image: omerbsezer/kubeflow_component:xg_boost_v1
command: [
{inputPath: Data},
{outputPath: Accuracy},
- Create requirements.txt:
- Dockerfile:
FROM python:3.8-slim
WORKDIR /pipelines
COPY requirements.txt /pipelines
RUN pip install -r requirements.txt
COPY /pipelines
- Go to the path where dockerfile is, then build Dockerfile and push the image to the Docker Registry:
docker image build -t omerbsezer/kubeflow_component:xg_boost_v1 .
docker push omerbsezer/kubeflow_component:xg_boost_v1
- This component contains following function. It does not needed seperate docker image file.
def show_results(decision_tree : float, logistic_regression : float, svm : float, naive_bayes : float, xg_boost : float) -> None:
# Given the outputs from decision_tree, logistic regression, svm, naive_bayes, xg_boost components
print(f"Decision tree (accuracy): {decision_tree}")
print(f"Logistic regression (accuracy): {logistic_regression}")
print(f"SVM (SVC) (accuracy): {svm}")
print(f"Naive Bayes (Gaussian) (accuracy): {naive_bayes}")
print(f"XG Boost (accuracy): {xg_boost}")
- Install kfp package
pip install kfp
- Create
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
def show_results(decision_tree : float, logistic_regression : float, svm : float, naive_bayes : float, xg_boost : float) -> None:
# Given the outputs from decision_tree, logistic regression, svm, naive_bayes, xg_boost components
print(f"Decision tree (accuracy): {decision_tree}")
print(f"Logistic regression (accuracy): {logistic_regression}")
print(f"SVM (SVC) (accuracy): {svm}")
print(f"Naive Bayes (Gaussian) (accuracy): {naive_bayes}")
print(f"XG Boost (accuracy): {xg_boost}")
@dsl.pipeline(name='ML Models Pipeline', description='Applies Decision Tree, Logistic Regression, SVM, Naive Bayes, XG Boost for classification problem.')
def ml_models_pipeline():
# Loads the yaml manifest for each component
download = kfp.components.load_component_from_file('download_data/download_data.yaml')
decision_tree = kfp.components.load_component_from_file('decision_tree/decision_tree.yaml')
logistic_regression = kfp.components.load_component_from_file('logistic_regression/logistic_regression.yaml')
svm = kfp.components.load_component_from_file('svm/svm.yaml')
naive_bayes = kfp.components.load_component_from_file('naive_bayes/naive_bayes.yaml')
xg_boost = kfp.components.load_component_from_file('xg_boost/xg_boost.yaml')
# Run download_data task
download_task = download()
# Run ML models tasks with input data
decision_tree_task = decision_tree(download_task.output)
logistic_regression_task = logistic_regression(download_task.output)
svm_task = svm(download_task.output)
naive_bayes_task = naive_bayes(download_task.output)
xg_boost_task = xg_boost(download_task.output)
# Given the outputs from ML models tasks
# the component "show_results" is called to print the results.
show_results(decision_tree_task.output, logistic_regression_task.output, svm_task.output, naive_bayes_task.output, xg_boost_task.output)
if __name__ == '__main__':
kfp.compiler.Compiler().compile(ml_models_pipeline, 'MLModelsPipeline.yaml')
- Run pipeline (DSL Compile) to create Workflow Pipeline (Argoflow). After creating pipeline, it creates 'MLModelsPipeline.yaml'.
Import Kubeflow created 'MLModelsPipeline.yaml':
Create an experiment:
Create a run:
Kubeflow creates pipeline and runs it:
With 'kubectl get pods -n kubeflow-user', we can follow the status of the running pods in the K8s cluster (in our scenario, in MiniKF)
When clicking on the each step, it can be seen the details
When clicking on the last step:
We run multiple models in parallel, XGBoost and Decision Tree have best accuracy results: