Skip to content

Commit

Permalink
Merge pull request #219 from mlrun/0.10.x-dev
Browse files Browse the repository at this point in the history
0.10.x dev
  • Loading branch information
aviaIguazio authored Jan 20, 2022
2 parents b927c5a + 0025bae commit 55d2504
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 108 deletions.
96 changes: 48 additions & 48 deletions network-operations/01-ingest.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
"metadata": {},
"source": [
"# Network Operations Demo - Data Ingestion & Preparation\n",
"This project demonstrates how to build an automated machine-learning (ML) pipeline for predicting network outages based on network-device telemetry, also known as Network Operations (NetOps). The demo covers how to setup a real-time system for data ingestion, feature engineering, model training, deployment and monitoring on top of Iguazio's Data Science Platform. We use the open source MLRun MLOps orchestration framework and Feature Store in combination with the open source Nuclio serverless engine as a runtime.\n",
"This project demonstrates how to build an automated machine-learning (ML) pipeline for predicting network outages based on network-device telemetry, also known as Network Operations (NetOps). The demo covers how to setup a real-time system for data ingestion, feature engineering, model training, deployment, and monitoring, on top of Iguazio's Data Science Platform. It uses the open source MLRun MLOps orchestration framework and Feature Store in combination with the open source Nuclio serverless engine as a runtime.\n",
"\n",
"The demo consists of:\n",
"1. Building and testing features from three sources (device metadata, real-time device metrics, and real-time device labels) using the feature store\n",
"2. Ingesting the data using batch (for testing) or real-time (for production)\n",
"3. Train and test the model with data from the feature-store\n",
"4. Deploying the model as part of a real-time feature engineering and inference pipeline\n",
"5. Real-time model and metrics monitoring, drift detection\n",
"1. Building and testing features from three sources (device metadata, real-time device metrics, and real-time device labels) using the feature store.\n",
"2. Ingesting the data using batch (for testing) or real-time (for production).\n",
"3. Training and testing the model with data from the feature-store.\n",
"4. Deploying the model as part of a real-time feature engineering and inference pipeline.\n",
"5. Real-time model and metrics monitoring, drift detection.\n",
"\n",
"**In this notebook:**\n",
"* [**Part 1: Create and configure an MLRun project**](#Part-1:-Create-and-configure-an-MLRun-project)\n",
"* [**Part 2: Define and test the feature engineering pipeline**](#Part-2:-Define-and-test-the-feature-engineering-pipelines)\n",
"* [**Part 3: Ingest the features data using batch or real-time**](#Part-3:-Ingest-the-features-data-using-batch-or-real-time)\n",
"\n",
"in the next [**02-training-and-deployment**](./02-training-and-deployment.ipynb) notebook you can learn how to build automated pipelines which train, test, and deploy models using data from the feature store.\n",
"In the next [**02-training-and-deployment**](./02-training-and-deployment.ipynb) notebook you can learn how to build automated pipelines that train, test, and deploy models using data from the feature store.\n",
"\n",
"**Please run the following ONCE to install the required packages, and restart the notebook kernel right after:**"
"**Run the following ONCE to install the required packages, and then immediately restart the notebook kernel:**"
]
},
{
Expand Down Expand Up @@ -86,12 +86,12 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define project parameters, artifacts, and functions\n",
"Throughout the project, we will require some files, configurations, streams, etc. to be shared with the different functions. \n",
"MLRun allows us to easily do this by using the project parameters (`project.params`) or using artifacts which are available to all the functions in the project. \n",
"### Define the project parameters, artifacts, and functions\n",
"Throughout the project, there are files, configurations, streams, etc. that are shared with the different functions. \n",
"Sharing is simplified by using the MLRun project parameters (`project.params`) or the artifacts, which are available to all the functions in the project. \n",
"\n",
"We will define 2 stream paths for the network-device-metrics and network-device-labels, and a network-device static data KV table for our feature sets. \n",
"We will also log the parameters for the metrics generator (simulator) which will be used by different functions in the project.\n",
"Define two stream paths for the network-device-metrics and network-device-labels, and a network-device static data KV table for the feature sets. \n",
"You'll also log the parameters for the metrics generator (simulator), which are used by different functions in the project.\n",
"\n",
"The functions/code used in the project are registered (using `set_function`) allowing reference to code/functions by name and CI/CD pipelines."
]
Expand All @@ -117,7 +117,7 @@
"device_metrics_stream = f'v3io:///projects/{project.name}/streams/{device_metrics_fs_name}'\n",
"device_labels_stream = f'v3io:///projects/{project.name}/streams/{device_labels_fs_name}'\n",
"\n",
"# Set the configuration and stream paths as a project parameters so it could be picked up by the functions\n",
"# Set the configuration and stream paths as a project parameters so they can be picked up by the functions\n",
"project.params = {\n",
" 'metrics_configuration_uri': metrics_configuration_uri,\n",
" 'device_metrics_stream': device_metrics_stream,\n",
Expand Down Expand Up @@ -146,24 +146,24 @@
"source": [
"## Part 2: Define and test the feature engineering pipelines\n",
"\n",
"Our model is using 2 input datasets and target labels dataset (y):\n",
"This model uses two input datasets and one target labels dataset (y):\n",
"* Static data and metadata per device (location, model, etc.)\n",
"* Real-time metrics per device (CPU and memory usage, throughput, latency, etc.)\n",
"* Real-time Labels (indications for device failures/errors, etc..) \n",
"* Real-time Labels (indications for device failures/errors, etc.) \n",
"\n",
"We use the feature store to create 3 different **feature sets** (one per dataset) and apply various aggregations and transformations "
"Use the feature store to create three different **feature sets** (one per dataset) and apply various aggregations and transformations "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get data sample (simulated) for developing the features\n",
"We use the `v3io_generator` simulator to generate fake device and metrics data with real-world behavior, the exact data schema and patterns are defined in the [**metrics configuration file**](./src/metric_configurations.yaml), it can be adjusted to simulate your specific features and time-series metrics.\n",
"### Get the data sample (simulated) for developing the features\n",
"Use the `v3io_generator` simulator to generate fake device and metrics data with real-world behavior. The exact data schema and patterns are defined in the [**metrics configuration file**](./src/metric_configurations.yaml). It can be adjusted to simulate your specific features and time-series metrics.\n",
"\n",
"The [**generator function**](./src/generator.py) is imported locally (for testing), later in the notebook we will demonstrate how it can also be deployed as a real-time Nuclio function which continuously sends real-time metrics and labels over streams (for testing real-time ingestion/processing).\n",
"The [**generator function**](./src/generator.py) is imported locally (for testing). Later in the notebook you'll see how it can also be deployed as a real-time Nuclio function that continuously sends real-time metrics and labels over streams (for testing real-time ingestion/processing).\n",
"\n",
"We use the generator and retrieve sample data (Static devices dataset, Real-time metrics, Real-time Labels) for developing and testing our features."
"Use the generator and retrieve sample data (Static devices dataset, Real-time metrics, Real-time Labels) for developing and testing our features."
]
},
{
Expand All @@ -190,10 +190,10 @@
"metadata": {},
"source": [
"### Feature Set 1 - Static Network Devices data/metadata \n",
"This feature set will hold all the static network-device data. As such it will be represented and ingested as a table. \n",
"Since such static data usually holds a lot of categorical properties about the device, we will use a `one hot encoder` to encode our categorical features to a supported format and We will save this dataset results to a No-Sql (Key Value) table. \n",
"This feature set holds all the static network-device data. As such, it will be represented and ingested as a table. \n",
"Since such static data usually holds a lot of categorical properties about the device, use a `one hot encoder` to encode the categorical features to a supported format, and then save this dataset result to a No-Sql (Key Value) table. \n",
"\n",
"After defining the feature set we will plot it's computational graph so we can easily review it in the notebook, preview it to test the wanted results and deploy it."
"After defining the feature set, plot its computational graph so you can easily review it in the notebook, preview it to test the wanted results, and deploy it."
]
},
{
Expand Down Expand Up @@ -279,7 +279,7 @@
" entities=['device'], \n",
" description='Static data for the devices')\n",
"\n",
"# Append the one hot encoder to the feature set's processing graph, use values from configuration\n",
"# Append the one hot encoder to the feature set's processing graph, use values from the configuration\n",
"static_fs.graph.to(OneHotEncoder(mapping=metrics_configuration['static']))\n",
"\n",
"# Set the default targets for the feature set (parquet & no-sql)\n",
Expand Down Expand Up @@ -492,9 +492,9 @@
],
"source": [
"# Preview the feature sets computation results\n",
"# (this is how the data will be finally saved to our feature store)\n",
"# (this is how the data will be finally saved to the feature store)\n",
"# * The preview also serves to infer the schema of the feature set \n",
"# * which could be later saved with the feature set.\n",
"# * that could be saved later with the feature set.\n",
"fstore.preview(static_fs, static_df).head()"
]
},
Expand All @@ -519,7 +519,7 @@
"- Packet Loss\n",
"- Latency\n",
"\n",
"This feature set represents ingestion of real-time timeseries data via an incoming data stream. On top of this time series data we will perform rolling aggregations of `mean`, `min`, `max` on top of `1 hour` and `6 hours` windows.\n"
"This feature set represents ingestion of real-time timeseries data via an incoming data stream. This code defines rolling aggregations of `mean`, `min`, `max` on top of `1 hour` and `6 hours` windows.\n"
]
},
{
Expand Down Expand Up @@ -891,7 +891,7 @@
}
],
"source": [
"# We preview and test the calculated features before the deployment \n",
"# Preview and test the calculated features before the deployment \n",
"fstore.preview(device_metrics_set, metrics_df).head()"
]
},
Expand All @@ -910,7 +910,7 @@
"metadata": {},
"source": [
"### Feature Set 3 - Network Device Labels\n",
"This feature set represents incoming failure labels for our devices. The labels are ingested through their own stream to simulate them arriving through a different process and apply the asof merging with the related metrics through the feature store.\n"
"This feature set represents incoming failure labels for our devices. The labels are ingested through their own stream to simulate their arrival through a different process and apply the asof merging with the related metrics through the feature store.\n"
]
},
{
Expand Down Expand Up @@ -989,7 +989,7 @@
" operations=['max'], \n",
" windows=['1h'], period='10m')\n",
"\n",
"# specify only Parquet (offline) target since its not used for real-time\n",
"# specify only Parquet (offline) target since it's not used for real-time\n",
"device_labels_set.set_targets(['parquet'], with_defaults=False)\n",
"device_labels_set.plot(with_targets=True)"
]
Expand Down Expand Up @@ -1154,12 +1154,12 @@
"\n",
"## Part 3: Ingest the features data using batch or real-time\n",
"\n",
"In order to use the features in training or serving we need to ingest the data into the feature store, there are 3 ways we can use:\n",
"To use the features in training or serving the data must be ingested into the feature store, there are three ways to ingest data:\n",
"1. Direct ingestion - ingest the data directly from the client/notebook (interactively) \n",
"2. Batch/scheduled ingestion - create a service/job which will ingest data from the source (e.g. file, DB, ..)\n",
"3. Real-time/Streaming ingestion - create an online service which accepts real-time events (from a stream, http, etc.) and push them into the feature store\n",
"2. Batch/scheduled ingestion - create a service/job whithat that ingests data from the source (e.g. file, DB, ..)\n",
"3. Real-time/Streaming ingestion - create an online service that accepts real-time events (from a stream, http, etc.) and push them into the feature store\n",
"\n",
"Direct and batch ingestion are achieved using the `ingest()` method, while real-time ingestion is done using the `deploy_ingestion_service()` method, we will demonstrate both methods in the following sections, the direct ingestion is great for development and testing while the real-time ingestion is mainly used in production."
"Direct and batch ingestion are achieved using the `ingest()` method, while real-time ingestion is done using the `deploy_ingestion_service()` method. Both methods are demonstrated in the following sections. The direct ingestion is great for development and testing while the real-time ingestion is mainly used in production."
]
},
{
Expand All @@ -1168,13 +1168,13 @@
"source": [
"### Direct/batch ingestion of the sample data \n",
"\n",
"In order to run training or test our serving we need to ingest and transform the input datasets and store the results in the feature store, the simplest way is to use the `ingest()` method and specify the feature-set and the source (Dataframe, file, etc.).\n",
"To run training or test the serving you need to ingest and transform the input datasets and store the results in the feature store. The simplest way is to use the `ingest()` method and specify the feature-set and the source (Dataframe, file, etc.).\n",
"\n",
"We can specify the desired target if we want to overwrite the default behaviour, e.g. set `targets=[ParquetTarget()]` to specify that the data will only be written to parquet files and will not be written to the NoSQL DB (meaning you cannot run real-time serving)\n",
"You can specify the desired target if you want to overwrite the default behavior. For example, set `targets=[ParquetTarget()]` to specify that the data will only be written to parquet files and will not be written to the NoSQL DB (meaning you cannot run real-time serving).\n",
"\n",
"The `ingest()` method have many other args/options, see the documentation for details.\n",
"The `ingest()` method has many other args/options, see the documentation for details.\n",
"\n",
"**Once the data is ingested we can run next [02-training-and-deployment](./02-training-and-deployment.ipynb) notebook**"
"**Once the data is ingested you can run next [02-training-and-deployment](./02-training-and-deployment.ipynb) notebook**"
]
},
{
Expand Down Expand Up @@ -1203,24 +1203,24 @@
"\n",
"### Real-time ingestion\n",
"\n",
"In production the data will arrive in real-time via a stream, the ingestion service will use real-time Nuclio functions which listen on the event stream or HTTP endpoint and ingest the data while running the set of real-time transformations and aggregations.\n",
"In production the data arrives in real-time via a stream, the ingestion service uses real-time Nuclio functions that listen on the event stream or HTTP endpoint and ingest the data while running the set of real-time transformations and aggregations.\n",
"\n",
"To simulate the real-time streams, we create a real-time generator function which generates semi-random data and write it into streams, the feature ingestion services will read from those streams, transform the data and write the results in parallel into offline (parquet files) and online (NoSQL DB) data targets."
"To simulate the real-time streams, create a real-time generator function that generates semi-random data and writes it into streams. The feature ingestion services reads from those streams, transforms the data and writes the results in parallel into offline (parquet files) and online (NoSQL DB) data targets."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create the real-time generator function\n",
"We deploy the [**metrics generator function**](src/generator.py) which generate the following simulated data: \n",
"- Static table with all the network devices, their model and manufacturing country\n",
"- Metrics stream with telemtry data\n",
"- Labels stream with labels for the device status\n",
"Deploy the [**metrics generator function**](src/generator.py) that generates the following simulated data: \n",
"- Static table with all the network devices, their model, and manufacturing country\n",
"- Metrics stream with telemetry data\n",
"- Labels the streams with labels for the device status\n",
"\n",
"The description and statistics for the simulated data are defined in the [**metrics configuration file**](./src/metric_configurations.yaml).\n",
"\n",
"The generator function will be deployed as a Nuclio function on top of our Kubernetes cluster and will be called every 1 minute as defined via the cron trigger to produce new data and push it to the relevant streams. Later on, MLRun's feature store ingestion functions will listen to these streams and tables and ingest the data to the feature store. "
"The generator function is deployed as a Nuclio function on top of the Kubernetes cluster and is called every one minute as defined via the cron trigger to produce new data and push it to the relevant streams. Later on, MLRun's feature store ingestion functions listen to these streams and tables and ingest the data to the feature store. "
]
},
{
Expand Down Expand Up @@ -1277,7 +1277,7 @@
"source": [
"#### Deploy the device metrics feature set ingestion endpoint\n",
"\n",
"Next we deploy the device metrics feature set processing pipeline over real-time serverless (Nuclio) function, we specify the source as a `StreamSource` with the path to the `device_metrics_stream`, and can specify which fields are used to determine the index and timestamp key."
"Next, deploy the device metrics feature set processing pipeline over real-time serverless (Nuclio) function. Specify the source as a `StreamSource` with the path to the `device_metrics_stream`, and you can specify which fields are used to determine the index and timestamp key."
]
},
{
Expand Down Expand Up @@ -1316,7 +1316,7 @@
"metadata": {},
"source": [
"#### Deploy device labels endpoint\n",
"We deploy the device labels feature set processing pipeline over real-time serverless (Nuclio) function, we specify the source as a `StreamSource` with the path to the `device_labels_stream`, and can specify which fields are used to determine the index and timestamp key."
"Deploy the device labels feature set processing pipeline over real-time serverless (Nuclio) function: specify the source as a `StreamSource` with the path to the `device_labels_stream`, and you can specify which fields are used to determine the index and timestamp key."
]
},
{
Expand Down
Loading

0 comments on commit 55d2504

Please sign in to comment.