From 02e62c93f6dabcf8b1b2a53c77dc846988723688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Wed, 5 Feb 2025 15:40:21 +0100 Subject: [PATCH] Fixup: use digest instead of hash --- dangerzone/updater/cli.py | 4 +- dangerzone/updater/registry.py | 23 ++++----- dangerzone/updater/signatures.py | 86 ++++++++++++++++---------------- 3 files changed, 56 insertions(+), 57 deletions(-) diff --git a/dangerzone/updater/cli.py b/dangerzone/updater/cli.py index f4ec2ac..c243bb9 100644 --- a/dangerzone/updater/cli.py +++ b/dangerzone/updater/cli.py @@ -29,9 +29,9 @@ def main(debug: bool) -> None: @click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION) def upgrade(image: str, pubkey: str) -> None: """Upgrade the image to the latest signed version.""" - manifest_hash = registry.get_manifest_hash(image) + manifest_digest = registry.get_manifest_digest(image) try: - is_upgraded = signatures.upgrade_container_image(image, manifest_hash, pubkey) + is_upgraded = signatures.upgrade_container_image(image, manifest_digest, pubkey) if is_upgraded: click.echo(f"✅ The local image {image} has been upgraded") click.echo(f"✅ The image has been signed with {pubkey}") diff --git a/dangerzone/updater/registry.py b/dangerzone/updater/registry.py index 28dca1e..b72d417 100644 --- a/dangerzone/updater/registry.py +++ b/dangerzone/updater/registry.py @@ -1,6 +1,6 @@ -import hashlib import re from collections import namedtuple +from hashlib import sha256 from typing import Dict, Optional, Tuple import requests @@ -8,7 +8,7 @@ import requests from . import errors, log __all__ = [ - "get_manifest_hash", + "get_manifest_digest", "list_tags", "get_manifest", "parse_image_location", @@ -28,9 +28,7 @@ ACCEPT_MANIFESTS_HEADER = ",".join( ) -class Image( - namedtuple("Image", ["registry", "namespace", "image_name", "tag", "digest"]) -): +class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])): __slots__ = () @property @@ -101,7 +99,8 @@ class RegistryClient: return tags def get_manifest( - self, tag: str, + self, + tag: str, ) -> requests.Response: """Get manifest information for a specific tag""" manifest_url = f"{self._image_url}/manifests/{tag}" @@ -123,8 +122,8 @@ class RegistryClient: .get("manifests") ) - def get_blob(self, hash: str) -> requests.Response: - url = f"{self._image_url}/blobs/{hash}" + def get_blob(self, digest: str) -> requests.Response: + url = f"{self._image_url}/blobs/{digest}" response = requests.get( url, headers={ @@ -134,19 +133,19 @@ class RegistryClient: response.raise_for_status() return response - def get_manifest_hash( + def get_manifest_digest( self, tag: str, tag_manifest_content: Optional[bytes] = None ) -> str: if not tag_manifest_content: tag_manifest_content = self.get_manifest(tag).content - return hashlib.sha256(tag_manifest_content).hexdigest() + return sha256(tag_manifest_content).hexdigest() # XXX Refactor this with regular functions rather than a class -def get_manifest_hash(image_str: str) -> str: +def get_manifest_digest(image_str: str) -> str: image = parse_image_location(image_str) - return RegistryClient(image).get_manifest_hash(image.tag) + return RegistryClient(image).get_manifest_digest(image.tag) def list_tags(image_str: str) -> list: diff --git a/dangerzone/updater/signatures.py b/dangerzone/updater/signatures.py index 65f855a..e60f9f9 100644 --- a/dangerzone/updater/signatures.py +++ b/dangerzone/updater/signatures.py @@ -55,7 +55,7 @@ def signature_to_bundle(sig: Dict) -> Dict: } -def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool: +def verify_signature(signature: dict, image_digest: str, pubkey: str) -> bool: """Verify a signature against a given public key""" # XXX - Also verfy the identity/docker-reference field against the expected value # e.g. ghcr.io/freedomofpress/dangerzone/dangerzone @@ -64,12 +64,12 @@ def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool: signature_bundle = signature_to_bundle(signature) payload_bytes = b64decode(signature_bundle["Payload"]) - payload_hash = json.loads(payload_bytes)["critical"]["image"][ + payload_digest = json.loads(payload_bytes)["critical"]["image"][ "docker-manifest-digest" ] - if payload_hash != f"sha256:{image_hash}": + if payload_digest != f"sha256:{image_digest}": raise errors.SignatureMismatch( - f"The signature does not match the image hash ({payload_hash}, {image_hash})" + f"The signature does not match the image digest ({payload_digest}, {image_digest})" ) with ( @@ -103,44 +103,44 @@ def verify_signature(signature: dict, image_hash: str, pubkey: str) -> bool: return False -def new_image_release(image: str) -> bool: - remote_hash = registry.get_manifest_hash(image) - local_hash = runtime.get_local_image_digest(image) - log.debug("Remote hash: %s", remote_hash) - log.debug("Local hash: %s", local_hash) - return remote_hash != local_hash +def is_update_available(image: str) -> bool: + remote_digest = registry.get_manifest_digest(image) + local_digest = runtime.get_local_image_digest(image) + log.debug("Remote digest: %s", remote_digest) + log.debug("Local digest: %s", local_digest) + return remote_digest != local_digest def verify_signatures( signatures: List[Dict], - image_hash: str, + image_digest: str, pubkey: str, ) -> bool: for signature in signatures: - if not verify_signature(signature, image_hash, pubkey): + if not verify_signature(signature, image_digest, pubkey): raise errors.SignatureVerificationError() return True -def upgrade_container_image(image: str, manifest_hash: str, pubkey: str) -> bool: +def upgrade_container_image(image: str, manifest_digest: str, pubkey: str) -> bool: """Verify and upgrade the image to the latest, if signed.""" - if not new_image_release(image): + if not is_update_available(image): raise errors.ImageAlreadyUpToDate("The image is already up to date") - signatures = get_remote_signatures(image, manifest_hash) - verify_signatures(signatures, manifest_hash, pubkey) + signatures = get_remote_signatures(image, manifest_digest) + verify_signatures(signatures, manifest_digest, pubkey) # At this point, the signatures are verified # We store the signatures just now to avoid storing unverified signatures - store_signatures(signatures, manifest_hash, pubkey) + store_signatures(signatures, manifest_digest, pubkey) # let's upgrade the image # XXX Use the image digest here to avoid race conditions return runtime.container_pull(image) -def _get_blob(tmpdir: str, hash: str) -> Path: - return Path(tmpdir) / "blobs" / "sha256" / hash.replace("sha256:", "") +def _get_blob(tmpdir: str, digest: str) -> Path: + return Path(tmpdir) / "blobs" / "sha256" / digest.replace("sha256:", "") def upgrade_container_image_airgapped(container_tar: str, pubkey: str) -> str: @@ -241,8 +241,8 @@ def convert_oci_images_signatures( return image_name, signatures -def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) -> str: - """Get the sha256 hash of a file or content""" +def get_file_digest(file: Optional[str] = None, content: Optional[bytes] = None) -> str: + """Get the sha256 digest of a file or content""" if not file and not content: raise errors.UpdaterError("No file or content provided") if file: @@ -253,13 +253,13 @@ def get_file_hash(file: Optional[str] = None, content: Optional[bytes] = None) - return "" -def load_signatures(image_hash: str, pubkey: str) -> List[Dict]: +def load_signatures(image_digest: str, pubkey: str) -> List[Dict]: """ Load signatures from the local filesystem See store_signatures() for the expected format. """ - pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) + pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey) if not pubkey_signatures.exists(): msg = ( f"Cannot find a '{pubkey_signatures}' folder." @@ -267,19 +267,19 @@ def load_signatures(image_hash: str, pubkey: str) -> List[Dict]: ) raise errors.SignaturesFolderDoesNotExist(msg) - with open(pubkey_signatures / f"{image_hash}.json") as f: + with open(pubkey_signatures / f"{image_digest}.json") as f: log.debug("Loading signatures from %s", f.name) return json.load(f) -def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> None: +def store_signatures(signatures: list[Dict], image_digest: str, pubkey: str) -> None: """ Store signatures locally in the SIGNATURE_PATH folder, like this: ~/.config/dangerzone/signatures/ - └── - └── .json - └── .json + └── + └── .json + └── .json The format used in the `.json` file is the one of `cosign download signature`, which differs from the "bundle" one used afterwards. @@ -292,22 +292,22 @@ def store_signatures(signatures: list[Dict], image_hash: str, pubkey: str) -> No payload = json.loads(b64decode(sig["Payload"])) return payload["critical"]["image"]["docker-manifest-digest"] - # All the signatures should share the same hash. - hashes = list(map(_get_digest, signatures)) - if len(set(hashes)) != 1: - raise errors.InvalidSignatures("Signatures do not share the same image hash") + # All the signatures should share the same digest. + digests = list(map(_get_digest, signatures)) + if len(set(digests)) != 1: + raise errors.InvalidSignatures("Signatures do not share the same image digest") - if f"sha256:{image_hash}" != hashes[0]: + if f"sha256:{image_digest}" != digests[0]: raise errors.SignatureMismatch( - f"Signatures do not match the given image hash ({image_hash}, {hashes[0]})" + f"Signatures do not match the given image digest ({image_digest}, {digests[0]})" ) - pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey) + pubkey_signatures = SIGNATURES_PATH / get_file_digest(pubkey) pubkey_signatures.mkdir(parents=True, exist_ok=True) - with open(pubkey_signatures / f"{image_hash}.json", "w") as f: + with open(pubkey_signatures / f"{image_digest}.json", "w") as f: log.info( - f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json" + f"Storing signatures for {image_digest} in {pubkey_signatures}/{image_digest}.json" ) json.dump(signatures, f) @@ -318,28 +318,28 @@ def verify_local_image(image: str, pubkey: str) -> bool: """ log.info(f"Verifying local image {image} against pubkey {pubkey}") try: - image_hash = runtime.get_local_image_digest(image) + image_digest = runtime.get_local_image_digest(image) except subprocess.CalledProcessError: raise errors.ImageNotFound(f"The image {image} does not exist locally") - log.debug(f"Image hash: {image_hash}") - signatures = load_signatures(image_hash, pubkey) + log.debug(f"Image digest: {image_digest}") + signatures = load_signatures(image_digest, pubkey) if len(signatures) < 1: raise errors.LocalSignatureNotFound("No signatures found") for signature in signatures: - if not verify_signature(signature, image_hash, pubkey): + if not verify_signature(signature, image_digest, pubkey): msg = f"Unable to verify signature for {image} with pubkey {pubkey}" raise errors.SignatureVerificationError(msg) return True -def get_remote_signatures(image: str, hash: str) -> List[Dict]: +def get_remote_signatures(image: str, digest: str) -> List[Dict]: """Retrieve the signatures from the registry, via `cosign download`.""" cosign.ensure_installed() process = subprocess.run( - ["cosign", "download", "signature", f"{image}@sha256:{hash}"], + ["cosign", "download", "signature", f"{image}@sha256:{digest}"], capture_output=True, check=True, )