Skip to content

Commit

Permalink
Merge pull request #137 from fgcz/main
Browse files Browse the repository at this point in the history
app_runner 0.0.15
  • Loading branch information
leoschwarz authored Feb 6, 2025
2 parents d133121 + 5c2fc50 commit 58f1bcc
Show file tree
Hide file tree
Showing 21 changed files with 415 additions and 18 deletions.
2 changes: 1 addition & 1 deletion bfabric/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "bfabric"
description = "Python client for the B-Fabric API"
version = "1.13.18"
version = "1.13.19"
license = { text = "GPL-3.0" }
authors = [
{ name = "Christian Panse", email = "[email protected]" },
Expand Down
14 changes: 12 additions & 2 deletions bfabric_app_runner/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## \[Unreleased\]

## \[0.0.15\] - 2025-02-03
## \[0.0.15\] - 2025-02-06

### Added

- New input type `file` which replaces `file_scp` and preserves timestamps whenever possible and allows to create
symlinks instead of copying the file, as needed.
- `BfabricOrderFastaSpec.required` which allows specifying whether the order fasta is required or not

### Changed

- Prepare for PyPI release, major rename.
- Better error when app version is not found.

### Fixed

- Config: Log messages are shown by default again.

## \[0.0.14\] - 2025-01-30

Expand Down
8 changes: 8 additions & 0 deletions bfabric_app_runner/docs/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
## Install App Runner

To install the most recent released version:

```bash
uv tool install bfabric_app_runner
```

To install a development version:

```bash
uv tool install bfabric_app_runner@git+https://github.com/fgcz/bfabricPy.git@main#egg=bfabric_app_runner&subdirectory=bfabric_app_runner
```
Expand Down
4 changes: 2 additions & 2 deletions bfabric_app_runner/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ build-backend = "hatchling.build"
[project]
name = "bfabric_app_runner"
description = "Application runner for B-Fabric apps"
version = "0.0.14"
version = "0.0.15"
license = { text = "GPL-3.0" }
authors = [
{name = "Leonardo Schwarz", email = "[email protected]"},
]
requires-python = ">=3.12"
dependencies = [
"bfabric==1.13.18",
"bfabric==1.13.19",
"pydantic",
"glom",
"mako",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ def resolve_app(versions: AppSpec, workunit_definition: WorkunitDefinition) -> A
raise ValueError("The workunit definition does not contain an application version.")
app_version = workunit_definition.execution.raw_parameters["application_version"]
# TODO graceful handling of invalid versions
if app_version in versions and versions[app_version] is not None:
return versions[app_version]
else:
msg = (
f"application_version '{app_version}' is not defined in the app spec,\n"
f" available versions: {sorted(versions.available_versions)}"
)
raise ValueError(msg)
return versions[app_version]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from enum import Enum

from bfabric_app_runner.specs.inputs.bfabric_order_fasta_spec import BfabricOrderFastaSpec
from bfabric_app_runner.specs.inputs.file_copy_spec import FileSpec
from bfabric_app_runner.specs.inputs.file_scp_spec import FileScpSpec
from bfabric.entities import Resource, Dataset
from bfabric_app_runner.specs.inputs.bfabric_dataset_spec import BfabricDatasetSpec # noqa: TC001
Expand Down Expand Up @@ -39,7 +40,11 @@ def check_integrity(spec: InputSpecType, local_path: Path, client: Bfabric) -> I
return _check_resource_spec(spec, local_path, client)
elif isinstance(spec, BfabricDatasetSpec):
return _check_dataset_spec(spec, local_path, client)
elif isinstance(spec, FileScpSpec) or spec.type == "bfabric_annotation" or isinstance(spec, BfabricOrderFastaSpec):
elif (
isinstance(spec, FileSpec | FileScpSpec)
or spec.type == "bfabric_annotation"
or isinstance(spec, BfabricOrderFastaSpec)
):
return IntegrityState.NotChecked
else:
raise ValueError(f"Unsupported spec type: {type(spec)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from bfabric_app_runner.input_preparation.collect_annotation import prepare_annotation
from bfabric_app_runner.input_preparation.integrity import IntegrityState
from bfabric_app_runner.input_preparation.list_inputs import list_input_states
from bfabric_app_runner.input_preparation.prepare_file_spec import prepare_file_spec
from bfabric_app_runner.specs.inputs.bfabric_dataset_spec import BfabricDatasetSpec
from bfabric_app_runner.specs.inputs.bfabric_order_fasta_spec import BfabricOrderFastaSpec
from bfabric_app_runner.specs.inputs.bfabric_resource_spec import BfabricResourceSpec
from bfabric_app_runner.specs.inputs.file_copy_spec import FileSpec
from bfabric_app_runner.specs.inputs.file_scp_spec import FileScpSpec
from bfabric_app_runner.specs.inputs_spec import (
InputSpecType,
Expand Down Expand Up @@ -40,6 +42,8 @@ def prepare_all(self, specs: list[InputSpecType]) -> None:
logger.debug(f"Skipping {spec} as it already exists and passed integrity check")
elif isinstance(spec, BfabricResourceSpec):
self.prepare_resource(spec)
elif isinstance(spec, FileSpec):
self.prepare_file_spec(spec)
elif isinstance(spec, FileScpSpec):
self.prepare_file_scp(spec)
elif isinstance(spec, BfabricDatasetSpec):
Expand Down Expand Up @@ -87,6 +91,9 @@ def prepare_resource(self, spec: BfabricResourceSpec) -> None:
if actual_checksum != resource["filechecksum"]:
raise ValueError(f"Checksum mismatch: expected {resource['filechecksum']}, got {actual_checksum}")

def prepare_file_spec(self, spec: FileSpec) -> None:
return prepare_file_spec(spec=spec, client=self._client, working_dir=self._working_dir, ssh_user=self._ssh_user)

def prepare_file_scp(self, spec: FileScpSpec) -> None:
scp_uri = f"{spec.host}:{spec.absolute_path}"
result_name = spec.resolve_filename(client=self._client)
Expand All @@ -102,21 +109,29 @@ def prepare_dataset(self, spec: BfabricDatasetSpec) -> None:
dataset.write_csv(path=target_path, separator=spec.separator)

def prepare_order_fasta(self, spec: BfabricOrderFastaSpec) -> None:
# Determine the result file.
result_name = self._working_dir / spec.filename
result_name.parent.mkdir(exist_ok=True, parents=True)

# Find the order.
match spec.entity:
case "workunit":
workunit = Workunit.find(id=spec.id, client=self._client)
if not isinstance(workunit.container, Order):
raise ValueError(f"Workunit {workunit.id} is not associated with an order")
msg = f"Workunit {workunit.id} is not associated with an order"
if spec.required:
raise ValueError(msg)
else:
logger.warning(msg)
result_name.write_text("")
return
order = workunit.container
case "order":
order = Order.find(id=spec.id, client=self._client)
case _:
assert_never(spec.entity)

# Write the result into the file
result_name = self._working_dir / spec.filename
result_name.parent.mkdir(exist_ok=True, parents=True)
fasta_content = order.data_dict.get("fastasequence", "")
if fasta_content and fasta_content[-1] != "\n":
fasta_content += "\n"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import shlex
import shutil
import subprocess
from pathlib import Path
from shutil import SameFileError
from subprocess import CalledProcessError
from typing import assert_never

from loguru import logger

from bfabric import Bfabric
from bfabric_app_runner.specs.inputs.file_copy_spec import (
FileSpec,
FileSourceSsh,
FileSourceLocal,
FileSourceSshValue,
)
from bfabric_app_runner.util.scp import scp


def prepare_file_spec(spec: FileSpec, client: Bfabric, working_dir: Path, ssh_user: str | None) -> None:
"""Prepares the file specified by the spec."""
output_path = working_dir / spec.resolve_filename(client=client)
output_path.parent.mkdir(exist_ok=True, parents=True)

if not spec.link:
success = _operation_copy_rsync(spec, output_path, ssh_user)
if not success:
success = _operation_copy(spec, output_path, ssh_user)
else:
success = _operation_link_symbolic(spec, output_path)
if not success:
raise RuntimeError(f"Failed to copy file: {spec}")


def _operation_copy_rsync(spec: FileSpec, output_path: Path, ssh_user: str | None) -> bool:
match spec.source:
case FileSourceLocal(local=local):
source_str = str(Path(local).resolve())
case FileSourceSsh(ssh=FileSourceSshValue(host=host, path=path)):
source_str = f"{ssh_user}@{host}:{path}" if ssh_user else f"{host}:{path}"
case _:
assert_never(spec.source)
cmd = ["rsync", "-Pav", source_str, str(output_path)]
logger.info(shlex.join(cmd))
result = subprocess.run(cmd, check=False)
return result.returncode == 0


def _operation_copy(spec: FileSpec, output_path: Path, ssh_user: str | None) -> bool:
match spec.source:
case FileSourceLocal():
return _operation_copy_cp(spec, output_path)
case FileSourceSsh():
return _operation_copy_scp(spec, output_path, ssh_user)
case _:
assert_never(spec.source)


def _operation_copy_scp(spec: FileSpec, output_path: Path, ssh_user: str | None) -> bool:
try:
source_str = f"{spec.source.ssh.host}:{spec.source.ssh.path}"
scp(source=source_str, target=output_path, user=ssh_user)
except CalledProcessError:
return False
return True


def _operation_copy_cp(spec: FileSpec, output_path: Path) -> bool:
cmd = [str(Path(spec.source.local).resolve()), str(output_path)]
logger.info(shlex.join(["cp", *cmd]))
try:
shutil.copyfile(*cmd)
except (OSError, SameFileError):
return False
return True


def _operation_link_symbolic(spec: FileSpec, output_path: Path) -> bool:
# the link is created relative to the output file, so it should be more portable across apptainer images etc
source_path = Path(spec.source.local).resolve().relative_to(output_path.resolve().parent, walk_up=True)

# if the file exists, and only if it is a link as well
if output_path.is_symlink():
# check if it points to the same file, in which case we don't need to do anything
if output_path.resolve() == source_path.resolve():
logger.info("Link already exists and points to the correct file")
return True
else:
logger.info(f"rm {output_path}")
output_path.unlink()
elif output_path.exists():
raise RuntimeError(f"Output path already exists and is not a symlink: {output_path}")
cmd = ["ln", "-s", str(source_path), str(output_path)]
logger.info(shlex.join(cmd))
result = subprocess.run(cmd, check=False)
return result.returncode == 0
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def available_versions(self) -> set[str]:
"""The available versions of the app."""
return {version.version for version in self.versions}

def __contains__(self, version: str) -> bool:
return version in self.available_versions

def __getitem__(self, version: str) -> AppVersion | None:
"""Returns the app version with the provided version number or None if it does not exist."""
for app_version in self.versions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@

from pydantic import Field

AbsoluteFilePath = Annotated[str, Field(pattern=r"^/[^:]*$")]
"""Absolute file path, excluding ":" characters."""

RelativeFilePath = Annotated[str, Field(pattern=r"^[^/][^:]*$")]
"""Relative file path, excluding absolute paths and ":" characters."""
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class BfabricOrderFastaSpec(BaseModel):
id: int
entity: Literal["workunit", "order"]
filename: RelativeFilePath
required: bool = False

def resolve_filename(self, client: Bfabric) -> str:
return self.filename
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations

from typing import Literal, TYPE_CHECKING, Self

from pydantic import BaseModel, model_validator

from bfabric_app_runner.specs.common_types import RelativeFilePath, AbsoluteFilePath # noqa: TC001

if TYPE_CHECKING:
from bfabric import Bfabric


class FileSourceLocal(BaseModel):
local: AbsoluteFilePath

def get_filename(self) -> str:
return self.local.split("/")[-1]


class FileSourceSshValue(BaseModel):
host: str
path: AbsoluteFilePath


class FileSourceSsh(BaseModel):
ssh: FileSourceSshValue

def get_filename(self) -> str:
return self.ssh.path.split("/")[-1]


class FileSpec(BaseModel):
type: Literal["file"] = "file"
source: FileSourceSsh | FileSourceLocal
filename: RelativeFilePath | None = None
link: bool = False

@model_validator(mode="after")
def validate_no_link_ssh(self) -> Self:
if isinstance(self.source, FileSourceSsh) and self.link:
raise ValueError("Cannot link to a remote file.")
return self

def resolve_filename(self, client: Bfabric) -> str:
return self.filename if self.filename else self.source.get_filename()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from bfabric import Bfabric


# TODO(leo): deprecate later
class FileScpSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["file_scp"] = "file_scp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from bfabric_app_runner.specs.inputs.bfabric_dataset_spec import BfabricDatasetSpec
from bfabric_app_runner.specs.inputs.bfabric_order_fasta_spec import BfabricOrderFastaSpec
from bfabric_app_runner.specs.inputs.bfabric_resource_spec import BfabricResourceSpec
from bfabric_app_runner.specs.inputs.file_copy_spec import FileSpec
from bfabric_app_runner.specs.inputs.file_scp_spec import FileScpSpec

if TYPE_CHECKING:
from pathlib import Path
from bfabric import Bfabric

InputSpecType = Annotated[
BfabricResourceSpec | FileScpSpec | BfabricDatasetSpec | BfabricOrderFastaSpec | BfabricAnnotationSpec,
BfabricResourceSpec | FileSpec | FileScpSpec | BfabricDatasetSpec | BfabricOrderFastaSpec | BfabricAnnotationSpec,
Field(discriminator="type"),
]

Expand Down
6 changes: 4 additions & 2 deletions bfabric_app_runner/src/bfabric_app_runner/util/scp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import shlex
import subprocess
from pathlib import Path

Expand Down Expand Up @@ -38,8 +39,9 @@ def scp(source: str | Path, target: str | Path, *, user: str | None = None, mkdi
parent_path = Path(target).parent
parent_path.mkdir(parents=True, exist_ok=True)

logger.info(f"scp {source} {target}")
subprocess.run(["scp", source, target], check=True)
cmd = ["scp", source, target]
logger.info(shlex.join(cmd))
subprocess.run(cmd, check=True)


def _is_remote(path: str | Path) -> bool:
Expand Down
Loading

0 comments on commit 58f1bcc

Please sign in to comment.