Skip to content

Commit

Permalink
💅 Bundle attestation existence check together
Browse files Browse the repository at this point in the history
This patch moves said check out of the signing loop and performs the
check early in the process. It is then able to report multiple
problems in a single error.
  • Loading branch information
webknjaz committed Dec 10, 2024
1 parent 88a4d03 commit 72d1032
Showing 1 changed file with 45 additions and 11 deletions.
56 changes: 45 additions & 11 deletions attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,47 @@ def collect_dists(packages_dir: Path) -> list[Path]:
return dist_paths


def attest_dist(dist_path: Path, signer: Signer) -> None:
def assert_attestations_do_not_pre_exist(
dist_to_attestation_map: dict[Path, Path],
) -> None:
existing_attestations = {
f'* {dist !s} -> {dist_attestation !s}'
for dist, dist_attestation in dist_to_attestation_map.items()
if dist_attestation.exists()
}
if not existing_attestations:
return

existing_attestations_list = '\n'.join(map(str, existing_attestations))
error_message = (
'The following distributions already have publish attestations:'
f'{existing_attestations_list}',
)
die(error_message)


def compose_attestation_mapping(dist_paths: list[Path]) -> dict[Path, Path]:
dist_to_attestation_map = {
dist_path: dist_path.with_suffix(
f'{dist_path.suffix}.publish.attestation',
)
for dist_path in dist_paths
}

# We are the publishing step, so there should be no pre-existing publish
# attestation. The presence of one indicates user confusion.
attestation_path = Path(f'{dist_path}.publish.attestation')
if attestation_path.exists():
die(f'{dist_path} already has a publish attestation: {attestation_path}')
# Make sure there's no publish attestations on disk.
# We do this up-front to prevent partial signing.
assert_attestations_do_not_pre_exist(dist_to_attestation_map)

return dist_to_attestation_map


def attest_dist(
dist_path: Path,
attestation_path: Path,
signer: Signer,
) -> None:
dist = Distribution.from_file(dist_path)
attestation = Attestation.sign(signer, dist)

Expand All @@ -92,7 +126,9 @@ def get_identity_token() -> IdentityToken:


def main() -> None:
packages_dir = Path(sys.argv[1])
dist_to_attestation_map = compose_attestation_mapping(
collect_dists(Path(sys.argv[1])),
)

try:
identity = get_identity_token()
Expand All @@ -103,12 +139,10 @@ def main() -> None:
# since permissions can't be to blame at this stage.
die(_TOKEN_RETRIEVAL_FAILED_MESSAGE.format(identity_error=identity_error))

dist_paths = collect_dists(packages_dir)

with SigningContext.production().signer(identity, cache=True) as s:
debug(f'attesting to dists: {dist_paths}')
for dist_path in dist_paths:
attest_dist(dist_path, s)
with SigningContext.production().signer(identity, cache=True) as signer:
debug(f'attesting to dists: {dist_to_attestation_map.keys()}')
for dist_path, attestation_path in dist_to_attestation_map.items():
attest_dist(dist_path, attestation_path, signer)


if __name__ == '__main__':
Expand Down

0 comments on commit 72d1032

Please sign in to comment.