Skip to content

Commit

Permalink
[Serving] Adding support for secrets in serving runtime (mlrun#769)
Browse files Browse the repository at this point in the history
  • Loading branch information
theSaarco authored Mar 8, 2021
1 parent 8a80f65 commit a5886ec
Show file tree
Hide file tree
Showing 13 changed files with 440 additions and 101 deletions.
12 changes: 10 additions & 2 deletions hack/local/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,17 @@ Vault needs to be installed in your environment. This document does not cover th
refer to [**Vault documentation**](https://www.vaultproject.io/docs/install) for instructions.

Once Vault is running locally, you need to enable Kubernetes authentication for vault. Follow the instructions in [**Kubernetes auth method**](https://www.vaultproject.io/docs/auth/kubernetes) to enable it.
This involves setting up a service account with `TokenReview` permissions - the page linked above explains how to configure that properly.
This involves setting up a service account with `TokenReview` permissions - follow the instructions in
[**Vault Agent with Kubernetes**](https://learn.hashicorp.com/tutorials/vault/agent-kubernetes) - follow
the steps for service-account creation and configuring Vault to use it.

#### Configuring MLRun API service to work with Vault

> **Note:**
> By default, the MLRun API uses a service-account that does not have permissions to perform list of serviceaccounts.
> This permission is required for the Vault functionality, so you'll need to manually edit the `mlrun-api` role and
> add `serviceaccounts` to the list of APIs that are permitted.
The MLRun API service needs to have permissions to manipulate Vault k8s roles, Vault policies and of course handle secrets. The Vault authentication
method within MLRun uses a JWT token to authenticate using k8s mode. The JWT token can be placed in one of two locations:
Expand All @@ -137,7 +145,7 @@ We will use the 1st approach here - to configure the MLRun API pod, perform the
For example, If using Minikube on Mac and you're running Vault on your local laptop then it
should be set to `http://docker.for.mac.localhost:8200`. If you're running Vault on another
pod then the URL should just be the DNS name of that pod.
2. Set the Vault role for the pod to be `user:mlrun-api`. The role is specified through the `MLRUN__SECRET_STORES__VAULT__ROLE` environment variable
2. Set the Vault role for the pod to be `user:mlrun-api`. The role is specified through the `MLRUN_SECRET_STORES__VAULT__ROLE` environment variable
(which maps to the MLRun config parameter `secret_stores.vault.role`).
3. Set a service-account name for the MLRun API pod. By default the service account used is `mlrun-api`, and this is
the value used in the rest of this example. If you modify the service account name, make sure you modify the rest of the steps accordingly.
Expand Down
9 changes: 9 additions & 0 deletions mlrun/runtimes/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ def add_v3io_stream_trigger(
self.spec.min_replicas = shards
self.spec.max_replicas = shards

def add_vault_config_to_spec(self):
# Currently secrets are only handled in Serving runtime.
pass

def deploy(
self, dashboard="", project="", tag="", verbose=False,
):
Expand Down Expand Up @@ -546,6 +550,11 @@ def get_fullname(name, project, tag):

def deploy_nuclio_function(function: RemoteRuntime, dashboard="", watch=False):
function.set_config("metadata.labels.mlrun/class", function.kind)

# Add vault configurations to function's pod spec, if vault secret source was added.
# Needs to be here, since it adds env params, which are handled in the next lines.
function.add_vault_config_to_spec()

env_dict = {get_item_name(v): get_item_name(v, "value") for v in function.spec.env}
for key, value in function._get_runtime_env().items():
env_dict[key] = value
Expand Down
43 changes: 0 additions & 43 deletions mlrun/runtimes/kubejob.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,49 +216,6 @@ def deploy_step(
skip_deployed=skip_deployed,
)

def _add_vault_params_to_spec(self, runobj):
from ..config import config as mlconf

project_name = runobj.metadata.project
service_account_name = mlconf.secret_stores.vault.project_service_account_name.format(
project=project_name
)

project_vault_secret_name = self._get_k8s().get_project_vault_secret_name(
project_name, service_account_name
)
if project_vault_secret_name is None:
logger.info(f"No vault secret associated with project {project_name}")
return

volumes = [
{
"name": "vault-secret",
"secret": {"defaultMode": 420, "secretName": project_vault_secret_name},
}
]
# We cannot use expanduser() here, since the user in question is the user running in the pod
# itself (which is root) and not where this code is running. That's why this hacky replacement is needed.
token_path = mlconf.secret_stores.vault.token_path.replace("~", "/root")

volume_mounts = [{"name": "vault-secret", "mountPath": token_path}]

self.spec.update_vols_and_mounts(volumes, volume_mounts)
self.spec.env.append(
{
"name": "MLRUN_SECRET_STORES__VAULT__ROLE",
"value": f"project:{project_name}",
}
)
# In case remote URL is different than local URL, use it. Else, use the local URL
vault_url = mlconf.secret_stores.vault.remote_url
if vault_url == "":
vault_url = mlconf.secret_stores.vault.url

self.spec.env.append(
{"name": "MLRUN_SECRET_STORES__VAULT__URL", "value": vault_url}
)

def _run(self, runobj: RunObject, execution):

with_mlrun = (not self.spec.mode) or (self.spec.mode != "pass")
Expand Down
49 changes: 48 additions & 1 deletion mlrun/runtimes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
get_resource_labels,
generate_resources,
)
from ..utils import normalize_name, update_in, verify_field_regex
from ..utils import normalize_name, update_in, verify_field_regex, logger
from .base import BaseRuntime, FunctionSpec


Expand Down Expand Up @@ -246,3 +246,50 @@ def copy(self):
self._cop = ContainerOp("name", "image")
fn._cop = ContainerOp("name", "image")
return fn

def _add_vault_params_to_spec(self, runobj=None, project=None):
from ..config import config as mlconf

project_name = project or runobj.metadata.project
if project_name is None:
logger.warning("No project provided. Cannot add vault parameters")
return

service_account_name = mlconf.secret_stores.vault.project_service_account_name.format(
project=project_name
)

project_vault_secret_name = self._get_k8s().get_project_vault_secret_name(
project_name, service_account_name
)
if project_vault_secret_name is None:
logger.info(f"No vault secret associated with project {project_name}")
return

volumes = [
{
"name": "vault-secret",
"secret": {"defaultMode": 420, "secretName": project_vault_secret_name},
}
]
# We cannot use expanduser() here, since the user in question is the user running in the pod
# itself (which is root) and not where this code is running. That's why this hacky replacement is needed.
token_path = mlconf.secret_stores.vault.token_path.replace("~", "/root")

volume_mounts = [{"name": "vault-secret", "mountPath": token_path}]

self.spec.update_vols_and_mounts(volumes, volume_mounts)
self.spec.env.append(
{
"name": "MLRUN_SECRET_STORES__VAULT__ROLE",
"value": f"project:{project_name}",
}
)
# In case remote URL is different than local URL, use it. Else, use the local URL
vault_url = mlconf.secret_stores.vault.remote_url
if vault_url == "":
vault_url = mlconf.secret_stores.vault.url

self.spec.env.append(
{"name": "MLRUN_SECRET_STORES__VAULT__URL", "value": vault_url}
)
49 changes: 49 additions & 0 deletions mlrun/runtimes/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ..model import ObjectList
from .function import RemoteRuntime, NuclioSpec
from .function_reference import FunctionReference
from ..secrets import SecretsStore
from ..utils import logger, get_caller_globals
from ..serving.server import create_graph_server, GraphServer
from ..serving.states import (
Expand Down Expand Up @@ -102,6 +103,7 @@ def __init__(
graph_initializer=None,
error_stream=None,
track_models=None,
secret_sources=None,
):

super().__init__(
Expand Down Expand Up @@ -140,6 +142,7 @@ def __init__(
self.graph_initializer = graph_initializer
self.error_stream = error_stream
self.track_models = track_models
self.secret_sources = secret_sources or []

@property
def graph(self) -> Union[RouterState, RootFlowState]:
Expand Down Expand Up @@ -349,13 +352,42 @@ def _deploy_function_refs(self):
function_object.apply(mlrun.v3io_cred())
function_ref.db_uri = function_object._function_uri()
function_object.verbose = self.verbose
function_object.spec.secret_sources = self.spec.secret_sources
function_object.deploy()

def remove_states(self, keys: list):
"""remove one, multiple, or all states/models from the spec (blank list for all)"""
if self.spec.graph:
self.spec.graph.clear_children(keys)

def with_secrets(self, kind, source):
"""register a secrets source (file, env or dict)
read secrets from a source provider to be used in workflows, example::
task.with_secrets('file', 'file.txt')
task.with_secrets('inline', {'key': 'val'})
task.with_secrets('env', 'ENV1,ENV2')
task.with_secrets('vault', ['secret1', 'secret2'...])
:param kind: secret type (file, inline, env)
:param source: secret data or link (see example)
:returns: The Runtime (function) object
"""

if kind == "vault" and isinstance(source, list):
source = {"project": self.metadata.project, "secrets": source}

self.spec.secret_sources.append({"kind": kind, "source": source})
return self

def add_vault_config_to_spec(self):
if self.spec.secret_sources:
self._secrets = SecretsStore.from_list(self.spec.secret_sources)
if self._secrets.has_vault_source():
self._add_vault_params_to_spec(project=self.metadata.project)

def deploy(self, dashboard="", project="", tag="", verbose=False):
"""deploy model serving function to a local/remote cluster
Expand All @@ -374,16 +406,26 @@ def deploy(self, dashboard="", project="", tag="", verbose=False):
# initialize or create required streams/queues
self.spec.graph.check_and_process_graph()
self.spec.graph.init_queues()

# Handle secret processing before handling child functions, since secrets are transferred to them
if self.spec.secret_sources:
# Before passing to remote builder, secrets values must be retrieved (for example from ENV)
# and stored as inline secrets. Otherwise, they will not be available to the builder.
self._secrets = SecretsStore.from_list(self.spec.secret_sources)
self.spec.secret_sources = self._secrets.to_serial()

if self._spec.function_refs:
# deploy child functions
self._add_ref_triggers()
self._deploy_function_refs()
logger.info(f"deploy root function {self.metadata.name} ...")

return super().deploy(dashboard, project, tag, verbose=verbose)

def _get_runtime_env(self):
env = super()._get_runtime_env()
function_name_uri_map = {f.name: f.uri(self) for f in self.spec.function_refs}

serving_spec = {
"function_uri": self._function_uri(),
"version": "v2",
Expand All @@ -395,6 +437,11 @@ def _get_runtime_env(self):
"error_stream": self.spec.error_stream,
"track_models": self.spec.track_models,
}

if self.spec.secret_sources:
self._secrets = SecretsStore.from_list(self.spec.secret_sources)
serving_spec["secret_sources"] = self._secrets.to_serial()

env["SERVING_SPEC_ENV"] = json.dumps(serving_spec)
return env

Expand All @@ -407,6 +454,7 @@ def to_mock_server(
:param log_level: log level (error | info | debug)
:param current_function: specify if you want to simulate a child function
"""

server = create_graph_server(
parameters=self.spec.parameters,
load_mode=self.spec.load_mode,
Expand All @@ -416,6 +464,7 @@ def to_mock_server(
graph_initializer=self.spec.graph_initializer,
track_models=self.spec.track_models,
function_uri=self._function_uri(),
secret_sources=self.spec.secret_sources,
**kwargs,
)
server.init(None, namespace or get_caller_globals(), logger=logger)
Expand Down
10 changes: 8 additions & 2 deletions mlrun/serving/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(
graph_initializer=None,
error_stream=None,
track_models=None,
secret_sources=None,
):
self._graph = None
self.graph: Union[RouterState, RootFlowState] = graph
Expand All @@ -92,7 +93,8 @@ def __init__(
self.error_stream = error_stream
self.track_models = track_models
self._error_stream_object = None
self._secrets = SecretsStore()
self.secret_sources = secret_sources
self._secrets = SecretsStore.from_list(secret_sources)
self._db_conn = None
self.resource_cache = None

Expand Down Expand Up @@ -124,9 +126,13 @@ def init(
):
"""for internal use, initialize all states (recursively)"""

if self.secret_sources:
self._secrets = SecretsStore.from_list(self.secret_sources)

if self.error_stream:
self._error_stream_object = get_stream_pusher(self.error_stream)
self.resource_cache = resource_cache or ResourceCache()

context = GraphContext(server=self, nuclio_context=context, logger=logger)

context.stream = _StreamContext(
Expand Down Expand Up @@ -343,7 +349,7 @@ def get_param(self, key: str, default=None):

def get_secret(self, key: str):
if self._server and self._server._secrets:
return self._secrets.get(key)
return self._server._secrets.get(key)
return None


Expand Down
9 changes: 4 additions & 5 deletions mlrun/utils/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,8 @@ def _api_call(self, method, url, data=None):

headers = {"X-Vault-Token": self._token}
full_url = self.url + "/" + url
if data:
data = json.dumps(data)

response = requests.request(method, full_url, headers=headers, data=data)
response = requests.request(method, full_url, headers=headers, json=data)

if not response:
logger.error(
Expand Down Expand Up @@ -145,6 +143,7 @@ def _safe_login_with_jwt_token(self, role):
def get_secrets(self, keys, user=None, project=None):
secret_path = VaultStore._generate_path(user=user, project=project)
secrets = {}

response = self._api_call("GET", secret_path)

if not response:
Expand Down Expand Up @@ -192,10 +191,10 @@ def create_project_policy(self, project):
policy_str = (
f'path "secret/data/mlrun/projects/{project}" {{\n'
+ ' capabilities = ["read", "list", "create", "delete", "update"]\n'
+ "}}\n"
+ "}\n"
+ f'path "secret/data/mlrun/projects/{project}/*" {{\n'
+ ' capabilities = ["read", "list", "create", "delete", "update"]\n'
+ "}}"
+ "}"
)

data_object = {"policy": policy_str}
Expand Down
15 changes: 15 additions & 0 deletions tests/api/runtimes/assets/serving_child_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from copy import copy


class ChildChain:
def __init__(self, context, name=None, secret=None):
self.context = context
self.name = name
self.secret_key = secret

def do(self, x):
x = copy(x)
secret_value = self.context.get_secret(self.secret_key)

x.append({f"child:{self.secret_key}": secret_value})
return x
15 changes: 15 additions & 0 deletions tests/api/runtimes/assets/serving_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from copy import copy


class Chain:
def __init__(self, context, name=None, secret=None):
self.context = context
self.name = name
self.secret_name = secret

def do(self, x):
x = copy(x)
secret_value = self.context.get_secret(self.secret_name)

x.append({self.secret_name: secret_value})
return x
Loading

0 comments on commit a5886ec

Please sign in to comment.