Skip to content

Commit

Permalink
New file input (#134)
Browse files Browse the repository at this point in the history
This implements a new input spec file which replaces file_scp and preserves timestamps whenever possible and allows to create symlinks instead of copying the file, as needed.
  • Loading branch information
leoschwarz authored Feb 3, 2025
1 parent 578069c commit f3e93f6
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 8 deletions.
7 changes: 3 additions & 4 deletions bfabric_app_runner/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## \[Unreleased\]

## \[0.0.15\] - 2025-02-03

### Changed
### Added

- Prepare for PyPI release, major rename.
- New input type `file` which replaces `file_scp` and preserves timestamps whenever possible and allows to create
symlinks instead of copying the file, as needed.

## \[0.0.14\] - 2025-01-30

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from enum import Enum

from bfabric_app_runner.specs.inputs.bfabric_order_fasta_spec import BfabricOrderFastaSpec
from bfabric_app_runner.specs.inputs.file_copy_spec import FileSpec
from bfabric_app_runner.specs.inputs.file_scp_spec import FileScpSpec
from bfabric.entities import Resource, Dataset
from bfabric_app_runner.specs.inputs.bfabric_dataset_spec import BfabricDatasetSpec # noqa: TC001
Expand Down Expand Up @@ -39,7 +40,11 @@ def check_integrity(spec: InputSpecType, local_path: Path, client: Bfabric) -> I
return _check_resource_spec(spec, local_path, client)
elif isinstance(spec, BfabricDatasetSpec):
return _check_dataset_spec(spec, local_path, client)
elif isinstance(spec, FileScpSpec) or spec.type == "bfabric_annotation" or isinstance(spec, BfabricOrderFastaSpec):
elif (
isinstance(spec, FileSpec | FileScpSpec)
or spec.type == "bfabric_annotation"
or isinstance(spec, BfabricOrderFastaSpec)
):
return IntegrityState.NotChecked
else:
raise ValueError(f"Unsupported spec type: {type(spec)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from bfabric_app_runner.input_preparation.collect_annotation import prepare_annotation
from bfabric_app_runner.input_preparation.integrity import IntegrityState
from bfabric_app_runner.input_preparation.list_inputs import list_input_states
from bfabric_app_runner.input_preparation.prepare_file_spec import prepare_file_spec
from bfabric_app_runner.specs.inputs.bfabric_dataset_spec import BfabricDatasetSpec
from bfabric_app_runner.specs.inputs.bfabric_order_fasta_spec import BfabricOrderFastaSpec
from bfabric_app_runner.specs.inputs.bfabric_resource_spec import BfabricResourceSpec
from bfabric_app_runner.specs.inputs.file_copy_spec import FileSpec
from bfabric_app_runner.specs.inputs.file_scp_spec import FileScpSpec
from bfabric_app_runner.specs.inputs_spec import (
InputSpecType,
Expand Down Expand Up @@ -40,6 +42,8 @@ def prepare_all(self, specs: list[InputSpecType]) -> None:
logger.debug(f"Skipping {spec} as it already exists and passed integrity check")
elif isinstance(spec, BfabricResourceSpec):
self.prepare_resource(spec)
elif isinstance(spec, FileSpec):
self.prepare_file_spec(spec)
elif isinstance(spec, FileScpSpec):
self.prepare_file_scp(spec)
elif isinstance(spec, BfabricDatasetSpec):
Expand Down Expand Up @@ -87,6 +91,9 @@ def prepare_resource(self, spec: BfabricResourceSpec) -> None:
if actual_checksum != resource["filechecksum"]:
raise ValueError(f"Checksum mismatch: expected {resource['filechecksum']}, got {actual_checksum}")

def prepare_file_spec(self, spec: FileSpec) -> None:
return prepare_file_spec(spec=spec, client=self._client, working_dir=self._working_dir, ssh_user=self._ssh_user)

def prepare_file_scp(self, spec: FileScpSpec) -> None:
scp_uri = f"{spec.host}:{spec.absolute_path}"
result_name = spec.resolve_filename(client=self._client)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import shlex
import shutil
import subprocess
from pathlib import Path
from shutil import SameFileError
from subprocess import CalledProcessError
from typing import assert_never

from loguru import logger

from bfabric import Bfabric
from bfabric_app_runner.specs.inputs.file_copy_spec import (
FileSpec,
FileSourceSsh,
FileSourceLocal,
FileSourceSshValue,
)
from bfabric_app_runner.util.scp import scp


def prepare_file_spec(spec: FileSpec, client: Bfabric, working_dir: Path, ssh_user: str | None) -> None:
"""Prepares the file specified by the spec."""
output_path = working_dir / spec.resolve_filename(client=client)
output_path.parent.mkdir(exist_ok=True, parents=True)

if not spec.link:
success = _operation_copy_rsync(spec, output_path, ssh_user)
if not success:
success = _operation_copy(spec, output_path, ssh_user)
else:
success = _operation_link_symbolic(spec, output_path)
if not success:
raise RuntimeError(f"Failed to copy file: {spec}")


def _operation_copy_rsync(spec: FileSpec, output_path: Path, ssh_user: str | None) -> bool:
match spec.source:
case FileSourceLocal(local=local):
source_str = str(Path(local).resolve())
case FileSourceSsh(ssh=FileSourceSshValue(host=host, path=path)):
source_str = f"{ssh_user}@{host}:{path}" if ssh_user else f"{host}:{path}"
case _:
assert_never(spec.source)
cmd = ["rsync", "-Pav", source_str, str(output_path)]
logger.info(shlex.join(cmd))
result = subprocess.run(cmd, check=False)
return result.returncode == 0


def _operation_copy(spec: FileSpec, output_path: Path, ssh_user: str | None) -> bool:
match spec.source:
case FileSourceLocal():
return _operation_copy_cp(spec, output_path)
case FileSourceSsh():
return _operation_copy_scp(spec, output_path, ssh_user)
case _:
assert_never(spec.source)


def _operation_copy_scp(spec: FileSpec, output_path: Path, ssh_user: str | None) -> bool:
try:
source_str = f"{spec.source.ssh.host}:{spec.source.ssh.path}"
scp(source=source_str, target=output_path, user=ssh_user)
except CalledProcessError:
return False
return True


def _operation_copy_cp(spec: FileSpec, output_path: Path) -> bool:
cmd = [str(Path(spec.source.local).resolve()), str(output_path)]
logger.info(shlex.join(["cp", *cmd]))
try:
shutil.copyfile(*cmd)
except (OSError, SameFileError):
return False
return True


def _operation_link_symbolic(spec: FileSpec, output_path: Path) -> bool:
# the link is created relative to the output file, so it should be more portable across apptainer images etc
source_path = Path(spec.source.local).resolve().relative_to(output_path.resolve().parent, walk_up=True)

# if the file exists, and only if it is a link as well
if output_path.is_symlink():
# check if it points to the same file, in which case we don't need to do anything
if output_path.resolve() == source_path.resolve():
logger.info("Link already exists and points to the correct file")
return True
else:
logger.info(f"rm {output_path}")
output_path.unlink()
elif output_path.exists():
raise RuntimeError(f"Output path already exists and is not a symlink: {output_path}")
cmd = ["ln", "-s", str(source_path), str(output_path)]
logger.info(shlex.join(cmd))
result = subprocess.run(cmd, check=False)
return result.returncode == 0
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@

from pydantic import Field

AbsoluteFilePath = Annotated[str, Field(pattern=r"^/[^:]*$")]
"""Absolute file path, excluding ":" characters."""

RelativeFilePath = Annotated[str, Field(pattern=r"^[^/][^:]*$")]
"""Relative file path, excluding absolute paths and ":" characters."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations

from typing import Literal, TYPE_CHECKING, Self

from pydantic import BaseModel, model_validator

from bfabric_app_runner.specs.common_types import RelativeFilePath, AbsoluteFilePath # noqa: TC001

if TYPE_CHECKING:
from bfabric import Bfabric


class FileSourceLocal(BaseModel):
local: AbsoluteFilePath

def get_filename(self) -> str:
return self.local.split("/")[-1]


class FileSourceSshValue(BaseModel):
host: str
path: AbsoluteFilePath


class FileSourceSsh(BaseModel):
ssh: FileSourceSshValue

def get_filename(self) -> str:
return self.ssh.path.split("/")[-1]


class FileSpec(BaseModel):
type: Literal["file"] = "file"
source: FileSourceSsh | FileSourceLocal
filename: RelativeFilePath | None = None
link: bool = False

@model_validator(mode="after")
def validate_no_link_ssh(self) -> Self:
if isinstance(self.source, FileSourceSsh) and self.link:
raise ValueError("Cannot link to a remote file.")
return self

def resolve_filename(self, client: Bfabric) -> str:
return self.filename if self.filename else self.source.get_filename()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from bfabric import Bfabric


# TODO(leo): deprecate later
class FileScpSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
type: Literal["file_scp"] = "file_scp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from bfabric_app_runner.specs.inputs.bfabric_dataset_spec import BfabricDatasetSpec
from bfabric_app_runner.specs.inputs.bfabric_order_fasta_spec import BfabricOrderFastaSpec
from bfabric_app_runner.specs.inputs.bfabric_resource_spec import BfabricResourceSpec
from bfabric_app_runner.specs.inputs.file_copy_spec import FileSpec
from bfabric_app_runner.specs.inputs.file_scp_spec import FileScpSpec

if TYPE_CHECKING:
from pathlib import Path
from bfabric import Bfabric

InputSpecType = Annotated[
BfabricResourceSpec | FileScpSpec | BfabricDatasetSpec | BfabricOrderFastaSpec | BfabricAnnotationSpec,
BfabricResourceSpec | FileSpec | FileScpSpec | BfabricDatasetSpec | BfabricOrderFastaSpec | BfabricAnnotationSpec,
Field(discriminator="type"),
]

Expand Down
6 changes: 4 additions & 2 deletions bfabric_app_runner/src/bfabric_app_runner/util/scp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import shlex
import subprocess
from pathlib import Path

Expand Down Expand Up @@ -38,8 +39,9 @@ def scp(source: str | Path, target: str | Path, *, user: str | None = None, mkdi
parent_path = Path(target).parent
parent_path.mkdir(parents=True, exist_ok=True)

logger.info(f"scp {source} {target}")
subprocess.run(["scp", source, target], check=True)
cmd = ["scp", source, target]
logger.info(shlex.join(cmd))
subprocess.run(cmd, check=True)


def _is_remote(path: str | Path) -> bool:
Expand Down
Loading

0 comments on commit f3e93f6

Please sign in to comment.