Skip to content

Commit

Permalink
Add Flow.ato_deployment, Flow.afrom_source, and `Runner.afrom_sto…
Browse files Browse the repository at this point in the history
…rage` (#16897)
  • Loading branch information
desertaxle authored Jan 30, 2025
1 parent 0b7c511 commit 38700d8
Show file tree
Hide file tree
Showing 7 changed files with 985 additions and 250 deletions.
121 changes: 117 additions & 4 deletions src/prefect/deployments/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def fast_flow():
from rich.table import Table

from prefect._experimental.sla.objects import SlaTypes
from prefect._internal.compatibility.async_dispatch import async_dispatch
from prefect._internal.concurrency.api import create_call, from_async
from prefect._internal.schemas.validators import (
reconcile_paused_deployment,
Expand Down Expand Up @@ -82,7 +83,7 @@ def fast_flow():
)
from prefect.types import ListOfNonEmptyStrings
from prefect.types.entrypoint import EntrypointType
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.asyncutils import run_coro_as_sync, sync_compatible
from prefect.utilities.callables import ParameterSchema, parameter_schema
from prefect.utilities.collections import get_from_dict, isiterable
from prefect.utilities.dockerutils import (
Expand Down Expand Up @@ -717,8 +718,7 @@ def from_entrypoint(
return deployment

@classmethod
@sync_compatible
async def from_storage(
async def afrom_storage(
cls,
storage: RunnerStorage,
entrypoint: str,
Expand All @@ -742,7 +742,7 @@ async def from_storage(
work_queue_name: Optional[str] = None,
job_variables: Optional[dict[str, Any]] = None,
_sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental
):
) -> "RunnerDeployment":
"""
Create a RunnerDeployment from a flow located at a given entrypoint and stored in a
local storage location.
Expand Down Expand Up @@ -831,6 +831,119 @@ async def from_storage(

return deployment

@classmethod
@async_dispatch(afrom_storage)
def from_storage(
cls,
storage: RunnerStorage,
entrypoint: str,
name: str,
flow_name: Optional[str] = None,
interval: Optional[
Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]
] = None,
cron: Optional[Union[Iterable[str], str]] = None,
rrule: Optional[Union[Iterable[str], str]] = None,
paused: Optional[bool] = None,
schedules: Optional["FlexibleScheduleList"] = None,
concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
parameters: Optional[dict[str, Any]] = None,
triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
version: Optional[str] = None,
enforce_parameter_schema: bool = True,
work_pool_name: Optional[str] = None,
work_queue_name: Optional[str] = None,
job_variables: Optional[dict[str, Any]] = None,
_sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental
) -> "RunnerDeployment":
"""
Create a RunnerDeployment from a flow located at a given entrypoint and stored in a
local storage location.
Args:
entrypoint: The path to a file containing a flow and the name of the flow function in
the format `./path/to/file.py:flow_func_name`.
name: A name for the deployment
flow_name: The name of the flow to deploy
storage: A storage object to use for retrieving flow code. If not provided, a
URL must be provided.
interval: An interval on which to execute the current flow. Accepts either a number
or a timedelta object. If a number is given, it will be interpreted as seconds.
cron: A cron schedule of when to execute runs of this flow.
rrule: An rrule schedule of when to execute runs of this flow.
triggers: A list of triggers that should kick of a run of this flow.
parameters: A dictionary of default parameter values to pass to runs of this flow.
description: A description for the created deployment. Defaults to the flow's
description if not provided.
tags: A list of tags to associate with the created deployment for organizational
purposes.
version: A version for the created deployment. Defaults to the flow's version.
enforce_parameter_schema: Whether or not the Prefect API should enforce the
parameter schema for this deployment.
work_pool_name: The name of the work pool to use for this deployment.
work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
If not provided the default work queue for the work pool will be used.
job_variables: Settings used to override the values specified default base job template
of the chosen work pool. Refer to the base job template of the chosen work pool for
available settings.
_sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
"""
from prefect.flows import load_flow_from_entrypoint

constructed_schedules = cls._construct_deployment_schedules(
interval=interval,
cron=cron,
rrule=rrule,
schedules=schedules,
)

if isinstance(concurrency_limit, ConcurrencyLimitConfig):
concurrency_options = {
"collision_strategy": concurrency_limit.collision_strategy
}
concurrency_limit = concurrency_limit.limit
else:
concurrency_options = None

job_variables = job_variables or {}

with tempfile.TemporaryDirectory() as tmpdir:
storage.set_base_path(Path(tmpdir))
run_coro_as_sync(storage.pull_code())

full_entrypoint = str(storage.destination / entrypoint)
flow = load_flow_from_entrypoint(full_entrypoint)

deployment = cls(
name=Path(name).stem,
flow_name=flow_name or flow.name,
schedules=constructed_schedules,
concurrency_limit=concurrency_limit,
concurrency_options=concurrency_options,
paused=paused,
tags=tags or [],
triggers=triggers or [],
parameters=parameters or {},
description=description,
version=version,
entrypoint=entrypoint,
enforce_parameter_schema=enforce_parameter_schema,
storage=storage,
work_pool_name=work_pool_name,
work_queue_name=work_queue_name,
job_variables=job_variables,
)
deployment._sla = _sla
deployment._path = str(storage.destination).replace(
tmpdir, "$STORAGE_BASE_PATH"
)

cls._set_defaults_from_flow(deployment, flow)

return deployment


@sync_compatible
async def deploy(
Expand Down
Loading

0 comments on commit 38700d8

Please sign in to comment.