Skip to content

Commit

Permalink
Merge pull request #552 from aviaIguazio/1.6.x-dev
Browse files Browse the repository at this point in the history
[ML-5454] update feature store new api methods
  • Loading branch information
aviaIguazio authored Jan 7, 2024
2 parents 5f0275d + cc3db52 commit e8f45f6
Show file tree
Hide file tree
Showing 11 changed files with 23 additions and 27 deletions.
10 changes: 5 additions & 5 deletions fraud-prevention-feature-store/01-ingest-datasources.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@
],
"source": [
"# Ingest your transactions dataset through your defined pipeline\n",
"transactions_df = fstore.ingest(transaction_set, transactions_data, \n",
"transactions_df = transaction_set.ingest(transactions_data,\n",
" infer_options=fstore.InferOptions.default())\n",
"\n",
"transactions_df.head(3)"
Expand Down Expand Up @@ -1033,7 +1033,7 @@
],
"source": [
"# Ingestion of your newly created events feature set\n",
"events_df = fstore.ingest(user_events_set, user_events_data)\n",
"events_df = user_events_set.ingest(user_events_data)\n",
"events_df.head(3)"
]
},
Expand Down Expand Up @@ -1226,7 +1226,7 @@
],
"source": [
"# Ingest the labels feature set\n",
"labels_df = fstore.ingest(labels_set, transactions_data)\n",
"labels_df = labels_set.ingest(transactions_data)\n",
"labels_df.head(3)"
]
},
Expand Down Expand Up @@ -1298,7 +1298,7 @@
"\n",
"# Deploy the transactions feature set's ingestion service over a real-time (Nuclio) serverless function\n",
"# you can use the run_config parameter to pass function/service specific configuration\n",
"transaction_set_endpoint, function = fstore.deploy_ingestion_service_v2(featureset=transaction_set, source=source)"
"transaction_set_endpoint, function = transaction_set.deploy_ingestion_service(source=source)"
]
},
{
Expand Down Expand Up @@ -1429,7 +1429,7 @@
"\n",
"# Deploy the transactions feature set's ingestion service over a real-time (Nuclio) serverless function\n",
"# you can use the run_config parameter to pass function/service specific configuration\n",
"events_set_endpoint, function = fstore.deploy_ingestion_service_v2(featureset=user_events_set, source=source)"
"events_set_endpoint, function = user_events_set.deploy_ingestion_service(source=source)"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
"from mlrun.datastore.targets import ParquetTarget\n",
"\n",
"# Get offline feature vector as dataframe and save the dataset to parquet\n",
"train_dataset = fstore.get_offline_features(fv_name, target=ParquetTarget())"
"train_dataset = fstore.FeatureVector.get_offline_features(fv_name, target=ParquetTarget())"
]
},
{
Expand Down
16 changes: 7 additions & 9 deletions network-operations/01-ingest.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@
"2. Batch/scheduled ingestion - create a service/job whithat that ingests data from the source (e.g. file, DB, ..)\n",
"3. Real-time/Streaming ingestion - create an online service that accepts real-time events (from a stream, http, etc.) and push them into the feature store\n",
"\n",
"Direct and batch ingestion are achieved using the `ingest()` method, while real-time ingestion is done using the `deploy_ingestion_service_v2()` method. Both methods are demonstrated in the following sections. The direct ingestion is great for development and testing while the real-time ingestion is mainly used in production."
"Direct and batch ingestion are achieved using the `ingest()` method, while real-time ingestion is done using the `deploy_ingestion_service` method. Both methods are demonstrated in the following sections. The direct ingestion is great for development and testing while the real-time ingestion is mainly used in production."
]
},
{
Expand Down Expand Up @@ -1175,13 +1175,13 @@
"from mlrun.datastore.targets import ParquetTarget\n",
"\n",
"# ingest the static device data\n",
"fstore.ingest(static_fs, static_df)\n",
"static_fs.ingest(static_df)\n",
"\n",
"# ingest the device metrics\n",
"fstore.ingest(device_metrics_set, metrics_df)\n",
"device_metrics_set.ingest(metrics_df)\n",
"\n",
"# ingest the labels\n",
"_ = fstore.ingest(device_labels_set, labels_df, targets=[ParquetTarget()])"
"_ = device_labels_set.ingest(labels_df, targets=[ParquetTarget()])"
]
},
{
Expand Down Expand Up @@ -1292,8 +1292,7 @@
"\n",
"# Deploy the transactions feature set's ingestion service using the feature set\n",
"# and all the defined resources above.\n",
"device_metrics_set_endpoint, function = fstore.deploy_ingestion_service_v2(\n",
" featureset=device_metrics_set, source=source)"
"device_metrics_set_endpoint, function = device_metrics_set.deploy_ingestion_service(source=source)"
]
},
{
Expand Down Expand Up @@ -1329,8 +1328,7 @@
"source = mlrun.datastore.sources.StreamSource(path=device_labels_stream , key_field='device', time_field='timestamp')\n",
"\n",
"# Deploy the transactions feature set's ingestion service using the feature set\n",
"device_labels_set_endpoint, function = fstore.deploy_ingestion_service_v2(\n",
" featureset=device_labels_set, source=source)"
"device_labels_set_endpoint, function = device_labels_set.deploy_ingestion_service(source=source)"
]
},
{
Expand Down Expand Up @@ -1364,4 +1362,4 @@
},
"nbformat": 4,
"nbformat_minor": 4
}
}
2 changes: 1 addition & 1 deletion network-operations/02-training-and-deployment.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@
],
"source": [
"# Request (get or create) the offline dataset from the feature store and save to a parquet target\n",
"dataset_ref = fstore.get_offline_features(fv, target=mlrun.datastore.targets.ParquetTarget())\n",
"dataset_ref = fstore.FeatureVector.get_offline_features(fv, target=mlrun.datastore.targets.ParquetTarget())\n",
"\n",
"# Get the generated offline dataset as a pandas DataFrame\n",
"dataset = dataset_ref.to_dataframe()\n",
Expand Down
2 changes: 1 addition & 1 deletion network-operations/src/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def init_context(context):
)

static_set = fstore.get_feature_set("static")
fstore.ingest(static_set, static_deployment)
static_set.ingest(static_deployment)

setattr(context, "label_col_indicator", "error")
setattr(context, "deployment_levels", ["device"])
Expand Down
5 changes: 2 additions & 3 deletions stocks-prediction/01_ingest_news.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -986,8 +986,7 @@
"\n",
"now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')\n",
"\n",
"fstore.ingest(news_set,\n",
" pd.DataFrame.from_dict({'ticker':['AMZN'],\n",
"news_set.ingest(pd.DataFrame.from_dict({'ticker':['AMZN'],\n",
" 'Datetime': now,\n",
" 'n_stocks':number_of_stocks}),\n",
" overwrite=True)"
Expand Down Expand Up @@ -1038,7 +1037,7 @@
"run_config = fstore.RunConfig(function=function, local=False)\n",
"\n",
"# Deploying\n",
"news_set_endpoint, function = fstore.deploy_ingestion_service_v2(featureset=news_set, run_config=run_config)"
"news_set_endpoint, function = news_set.deploy_ingestion_service(run_config=run_config)"
]
},
{
Expand Down
5 changes: 2 additions & 3 deletions stocks-prediction/02_ingest_stocks.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,7 @@
"name = 'stocks-dummy'\n",
"now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')\n",
"\n",
"fstore.ingest(info_set,\n",
" pd.DataFrame.from_dict({'ticker':[name],\n",
"info_set.ingest(pd.DataFrame.from_dict({'ticker':[name],\n",
" 'Datetime': now,\n",
" 'start_delta':6,\n",
" 'end_delta':0,\n",
Expand Down Expand Up @@ -624,7 +623,7 @@
],
"source": [
"# Deploying\n",
"info_set_endpoint, function = fstore.deploy_ingestion_service_v2(featureset=info_set, run_config=run_config)"
"info_set_endpoint, function = info_set.deploy_ingestion_service(run_config=run_config)"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion stocks-prediction/03_train_model.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@
"import datetime\n",
"start_time = datetime.datetime.now()-datetime.timedelta(59)\n",
"end_time = datetime.datetime.now()-datetime.timedelta(0)\n",
"train_dataset = fstore.get_offline_features(fv_name,start_time=start_time,end_time=end_time, timestamp_for_filtering = 'Datetime')\n",
"train_dataset = fstore.FeatureVector.get_offline_features(fv_name,start_time=start_time,end_time=end_time, timestamp_for_filtering = 'Datetime')\n",
"df = train_dataset.to_dataframe()\n",
"df"
]
Expand Down
2 changes: 1 addition & 1 deletion stocks-prediction/05_stocks_pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@
"import datetime\n",
"start_time = datetime.datetime.now()-datetime.timedelta(59)\n",
"end_time = end_time = datetime.datetime.now()-datetime.timedelta(0)\n",
"fv_data = fstore.get_offline_features(fv_name,start_time=start_time,end_time=end_time, timestamp_for_filtering = 'Datetime')\n",
"fv_data = fstore.FeatureVector.get_offline_features(fv_name,start_time=start_time,end_time=end_time, timestamp_for_filtering = 'Datetime')\n",
"fv_data.to_dataframe().head()"
]
},
Expand Down
2 changes: 1 addition & 1 deletion stocks-prediction/src/serving_stocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def preprocess(event):
end_time = datetime.datetime.now()-datetime.timedelta(event['end_time'])
seq_size = event['seq_size']

train_dataset = fstore.get_offline_features(vector_name, timestamp_for_filtering='Datetime', with_indexes=True, start_time=start_time, end_time=end_time)
train_dataset = fstore.FeatureVector.get_offline_features(vector_name, timestamp_for_filtering='Datetime', with_indexes=True, start_time=start_time, end_time=end_time)
price_cols = ['Open','High','Low','Close']
df = train_dataset.to_dataframe().reset_index(drop=False)
df.fillna(value=1,inplace=True)
Expand Down
2 changes: 1 addition & 1 deletion stocks-prediction/src/train_stocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class StocksDataset(Dataset):
def __init__(self, vector_name='stocks', seq_size=5, start_time=None, end_time=None):
start_time = datetime.datetime.now() - datetime.timedelta(start_time)
end_time = datetime.datetime.now() - datetime.timedelta(end_time)
train_dataset = fstore.get_offline_features(vector_name, timestamp_for_filtering='Datetime', with_indexes=True,
train_dataset = fstore.FeatureVector.get_offline_features(vector_name, timestamp_for_filtering='Datetime', with_indexes=True,
start_time=start_time, end_time=end_time)
price_cols = ['Open', 'High', 'Low', 'Close']
self.df = train_dataset.to_dataframe().reset_index(drop=False)
Expand Down

0 comments on commit e8f45f6

Please sign in to comment.